快上网专注成都网站设计 成都网站制作 成都网站建设
成都网站建设公司服务热线:028-86922220

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

javakafka代码 java往kafka写数据

使用java实现kafka consumer时报错

public static void consumer(){

创新互联公司于2013年成立,是专业互联网技术服务公司,拥有项目成都网站设计、网站建设、外贸网站建设网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元苏尼特左做网站,已为上家服务,为苏尼特左各地企业和个人服务,联系电话:028-86922220

Properties props = new Properties();  

props.put("zk.connect", "hadoop-2:2181");  

props.put("zk.connectiontimeout.ms", "1000000");  

props.put("groupid", "fans_group");  

// Create the connection to the cluster  

ConsumerConfig consumerConfig = new ConsumerConfig(props);  

ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  

MapString, Integer map = new HashMapString, Integer();

map.put("fans", 1);

// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume  

MapString, ListKafkaStreamMessage topicMessageStreams = consumerConnector.createMessageStreams(map);  

ListKafkaStreamMessage streams = topicMessageStreams.get("fans");  

// create list of 4 threads to consume from each of the partitions   

ExecutorService executor = Executors.newFixedThreadPool(1);  

long startTime = System.currentTimeMillis();

// consume the messages in the threads  

for(final KafkaStreamMessage stream: streams) {  

executor.submit(new Runnable() {  

public void run() {  

ConsumerIteratorMessage it = stream.iterator();

while (it.hasNext()){

log.debug(byteBufferToString(it.next().message().payload()));

}

}); 

log.debug("use time="+(System.currentTimeMillis()-startTime));

}  

}

Java使用kafka发送消息没有生效

一般消息发不出去很大可能都是配置或环境的问题

1、排查环境是否有问题,zookeeper节点是否存活,kafka节点是否存活,通过命令行的方式能否发出去消息(使用kafka-console-producer.sh),如果通过命令行都发不出去那就是集群的问题了。

2、网络问题,调用机器和集群之间网络是否通畅

3、调用时配置的host、port和集群中配置的是否一致,是否需要使用主机名而不是ip

4、客户端api版本是否和服务端差别太大导致不兼容

5、防火墙问题,关闭集群的防火墙实时

诸如此类,可能性太多就不一 一列举了。

你这既然有打印堆栈,如果报错肯定有异常信息的,可能卡住的时间比较长,耐心等待吧,祝你早日解决bug。

如何写java程序代码测试kafka

我这里是使用的是,kafka自带的zookeeper。

以及关于kafka的日志文件啊,都放在默认里即/tmp下,我没修改。保存默认的

1、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ jps

2625 Jps

2、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties

此刻,这时,会一直停在这,因为是前端运行。

另开一窗口,

3、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties

也是前端运行。


分享文章:javakafka代码 java往kafka写数据
文章出自:http://6mz.cn/article/doeecpe.html

其他资讯