KafKa服务搭建
事先安装好zookeeper
1.下载kafka:http://kafka.apache.org/downloads.html
这里我下载的是kafka_2.11-0.11.0.1.tgz
2.解压
tar -xzf kafka_2.11-0.11.0.1.tgz
解压后的目录结构
3.修改配置config/server.properties
主要修改:
broker.id=1
port=9092
host.name=broker的主机地址
#zookeeper主机地址和端口
zookeeper.connect=ip1:port1,ip2:port2,ip3:port3
详细参数说明参照:http://blog.csdn.net/lizhitao/article/details/25667831
4.启动kafka
bin/kafka-server-start.sh config/server.properties &
5.发送消息测试
启动producer
bin/kafka-console-producer.sh --broker-list localost:9092 --topic test
注意:此处localhost为本机IP,否则报错
启动后随便输入消息内容
>kafka消息发送测试
打开另外窗口,启动consumer
bin/kafka-console-consumer.sh --zookeeper localhost:port --topic test --from-beginning
注意:同样localhost和port是zookeeper服务IP和端口,有多个就用逗号“,”隔开
6.配置集群
在这里配置的是伪集群
拷贝配置文件
cp config/server.properties config/server-2.properties
修改参数:
broker.id=2
port=9093
host.name=broker的主机地址
7.启动新节点
bin/kafka-server-start.sh config/server-2.properties &
8.Java开发使用
1)引用相关jar包
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <exclusions> <!-- 实际应用中单独引入下面的jar包,不使用kafka带的 --> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>zkclient</artifactId> <groupId>com.101tec</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <!-- Zookeeper客户端 --> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.4</version> <exclusions> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> </exclusions> </dependency>
具体代码参照:http://www.cnblogs.com/lilixin/p/5775877.html
这里给出关键配置:
kafka.properties
zookeeper.connect=192.168.1.190:2181,192.168.1.190:2182,192.168.1.190:2183 #zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181 metadata.broker.list=192.168.1.190:9092,192.168.1.190:9093 #metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092 #zookeeper.connect.timeout=15000 #zookeeper.session.timeout.ms=15000 #zookeeper.sync.time.ms=20000 #auto.commit.interval.ms=20000 #auto.offset.reset=smallest #serializer.class=kafka.serializer.StringEncoder #producer.type=async #queue.buffering.max.ms=6000 test.group.id=huhui kafka.test.topics=huhui
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:mybatis="http://mybatis.org/schema/mybatis-spring" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd" default-autowire="byName"> <!-- 这个是加载给spring 用的. --> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:kafka.properties</value> </list> </property> </bean> <!-- 这个是用来在代码中注入用的. --> <bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="locations"> <list> <value>classpath:kafka.properties</value> </list> </property> </bean> <!-- kafka --> <import resource="applicationContext-kafka-producer.xml"/> <import resource="applicationContext-kafka-receiver.xml"/> </beans>
applicationContext-kafka-producer.xml
<bean id="topProducer" class="top.lilixin.TopProducer"> <constructor-arg index="0" value="${metadata.broker.list}" /> </bean>
applicationContext-kafka-receiver.xml
<!-- 定义消息处理器 --> <bean id="testConsumer" class="top.lilixin.TestConsumer"></bean> <!-- 定义收信人 receiver --> <bean id="topReceiver" class="top.lilixin.TopReceiver"> <constructor-arg index="0" value="${zookeeper.connect}" /><!-- _zookeeper集群地址,如: zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181_ --> <constructor-arg index="1" value="${test.group.id}" /><!-- _消费者所属组id字符串 ,如:vko_group_article_read_count_ --> <constructor-arg index="2" value="${kafka.test.topics}" /><!-- _要消费的消息主题,如:vko_group_ --> <constructor-arg index="3" ref="testConsumer" /> <!--_上面定义的消息处理器_ --> </bean>
项目原代码中TopReceiver.java有点小问题:服务关闭后,重启服务会重复读取消息。
原代码:
// 目前每个topic都是2个分区 topicCountMap.put(topic,2); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); for (final KafkaStream<byte[], byte[]> stream : streams) { new Thread(){ public void run(){ ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()){ String msg = new String(it.next().message()); try{ topConsumer.dealMsg(msg); }catch(Exception e){ log.error("kafka vkoConsumer topic:{} 收到消息:{} 消费异常 xxxxxxxxxxxxxxxxxx", topic, msg,e); } log.info("kafka vkoConsumer topic:{} 收到消息:{}", topic, msg); } } }.start(); log.info("kafka vkoConsumer 启动完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect); }
首先要知道的是,High Level Consumer在ZooKeeper上保存最新的offset(从指定的分区中读取)。这个offset基于consumer group名存储。
Consumer group名在Kafka集群上是全局性的,在启动新的consumer group的时候要小心集群上没有关闭的consumer。当一个consumer线程启动了,Kafka会将它加入到相同的topic下的相同consumer group里,并且触发重新分配。在重新分配时,Kafka将partition分配给consumer,有可能会移动一个partition给另一个consumer。如果老的、新的处理逻辑同时存在,有可能一些消息传递到了老的consumer上。
使用High Level Consumer,它应该是多线程的。消费者线程的数量跟tipic的partition数量有关,它们之间有一些特定的规则:
- 如果线程数量大于主题的分区数量,一些线程将得不到任何消息
- 如果分区数大于线程数,一些线程将得到多个分区的消息
- 如果一个线程处理多个分区的消息,它接收到消息的顺序是不能保证的。比如,先从分区10获取了5条消息,从分区11获取了6条消息,然后从分区10获取了5条,紧接着又从分区10获取了5条,虽然分区11还有消息。
- 添加更多了同consumer group的consumer将触发Kafka重新分配,某个分区本来分配给a线程的,从新分配后,有可能分配给了b线程。
Kafka不会再每次读取消息后马上更新zookeeper上的offset,而是等待一段时间。由于这种延迟,有可能消费者读取了一条消息,但没有更新offset。所以,当客户端关闭或崩溃后,从新启动时有些消息重复读取了。另外,broker宕机或其他原因导致更换了partition的leader,也会导致消息重复读取。
为了避免这种问题,你应该提供一个平滑的关闭方式,而不是使用kill -9
修改后:
private static final int THREAD_AMOUNT = 2; …… // 目前每个topic都是2个分区 topicCountMap.put(topic,THREAD_AMOUNT); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); ExecutorService executor = Executors.newFixedThreadPool(THREAD_AMOUNT); //使用ExecutorService来调度线程 for (int i = 0; i < streams.size(); i++) { KafkaStream<byte[], byte[]> kafkaStream = streams.get(i); executor.submit(new HanldMessageThread(kafkaStream, i)); log.info("kafka vkoConsumer 启动完成:groupId:"+groupId+",topic:"+topic+",zookeeperConnect:"+zookeeperConnect); } //关闭consumer try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } if (cc != null) { cc.shutdown(); } if (executor != null) { executor.shutdown(); } try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { log.info("Interrupted during shutdown, exiting uncleanly"); } …… /** * 具体处理message的线程 * @author Administrator * */ class HanldMessageThread implements Runnable { private KafkaStream<byte[], byte[]> kafkaStream = null; private int num = 0; public HanldMessageThread(KafkaStream<byte[], byte[]> kafkaStream, int num) { super(); this.kafkaStream = kafkaStream; this.num = num; } public void run() { ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); while(iterator.hasNext()) { String message = new String(iterator.next().message()); System.out.println("Thread no: " + num + ", message: " + message); } } }
具体参照:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
补充:非常不错的Kafka教程
http://orchome.com/kafka/index