快上网专注成都网站设计 成都网站制作 成都网站建设
成都网站建设公司服务热线:028-86922220

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

如何进行kafka批量消费多消费者问题分析

今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

专注于为中小企业提供网站制作、网站建设服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业瓮安免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了近千家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。

package com.llw.medical.bs.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;@Componentpublic class KafakaListener {@KafkaListener(id = "1", topics = {"topic2"})public void listen(ConsumerRecord record) {
        Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("----------------- record =" + record);
            System.out.println("----------------- message =" + message);
        }
    }@KafkaListener(id = "2", topicPartitions =
            {@TopicPartition(topic = "topic1",
                    partitions = {"1", "2", "3"}// partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )
            })public void listen2(ConsumerRecord record) {
        Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("----------------- record 1=" + record);
            System.out.println("------------------ message 1=" + message);
        }
    }//id = "4",    //id="4"    @KafkaListener( id= "4",groupId = "1",topics="topic1", /*topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )*//*            },*/ containerFactory = "kafkaBatchListener6")public void listen3(List> records) {//, Acknowledgment ack        try {for (ConsumerRecord record : records) {
                Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 4=" + record);//   System.out.println("------------------ message 4=" + message);                }
            }
        } finally {//   ack.acknowledge();        }
    }//id="5"    @KafkaListener(id = "5",groupId = "1",topics="topic1", /*topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            },*/ containerFactory = "kafkaBatchListener6")public void listen2(List> records) {//, Acknowledgment ack        try {for (ConsumerRecord record : records) {
                Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 6=" + record);//   System.out.println("------------------ message 6=" + message);                }
            }
        } finally {//   ack.acknowledge();        }
    }//https://www.cnblogs.com/linjiqin/p/13171789.html    @KafkaListener(id = "6",groupId = "1",topics="topic1",/* topicPartitions =            {@TopicPartition(topic = "topic1",                    partitions = {"0"}                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")            )            }, */containerFactory = "kafkaBatchListener6")public void listen4(List> records) {try {for (ConsumerRecord record : records) {
                Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 3=" + record);//   System.out.println("------------------ message 6=" + message);                }
            }
        } finally {//   ack.acknowledge();        }
    }

}

一个partition只能有一个消费者,如果多个消费者会是广播模式,每个消费者都会有一条数据,kafka是一个发布和订阅模式的主键,并不是队列模式,

spring boot整合时,如果使用topicPartitions 注解参数指定partition会有消息重复消费的问题,最好使用topics注解,并指定groupId。

看完上述内容,你们对如何进行kafka批量消费多消费者问题分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注创新互联行业资讯频道,感谢大家的支持。


当前文章:如何进行kafka批量消费多消费者问题分析
本文来源:http://6mz.cn/article/gepips.html

其他资讯