KafKa服务搭建

事先安装好zookeeper

1.下载kafka:http://kafka.apache.org/downloads.html

这里我下载的是kafka_2.11-0.11.0.1.tgz

2.解压

tar -xzf kafka_2.11-0.11.0.1.tgz

解压后的目录结构

KafKa服务搭建

3.修改配置config/server.properties

主要修改:

broker.id=1

port=9092

host.name=broker的主机地址

#zookeeper主机地址和端口

zookeeper.connect=ip1:port1,ip2:port2,ip3:port3

详细参数说明参照:http://blog.csdn.net/lizhitao/article/details/25667831

4.启动kafka

bin/kafka-server-start.sh config/server.properties &

5.发送消息测试

启动producer

bin/kafka-console-producer.sh --broker-list localost:9092 --topic test

注意:此处localhost为本机IP,否则报错

启动后随便输入消息内容

>kafka消息发送测试

打开另外窗口,启动consumer

bin/kafka-console-consumer.sh --zookeeper localhost:port --topic test --from-beginning

注意:同样localhost和port是zookeeper服务IP和端口,有多个就用逗号“,”隔开

6.配置集群

在这里配置的是伪集群

拷贝配置文件

cp config/server.properties config/server-2.properties

修改参数:

broker.id=2

port=9093

host.name=broker的主机地址

7.启动新节点

bin/kafka-server-start.sh config/server-2.properties &

8.Java开发使用

1)引用相关jar包

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.9.2</artifactId>
			<version>0.8.1.1</version>
			<exclusions>
				<!-- 实际应用中单独引入下面的jar包,不使用kafka带的 -->
				<exclusion>
					<artifactId>zookeeper</artifactId>
					<groupId>org.apache.zookeeper</groupId>
				</exclusion>
				<exclusion>
					<artifactId>zkclient</artifactId>
					<groupId>com.101tec</groupId>
				</exclusion>
				<exclusion>
					<artifactId>slf4j-api</artifactId>
					<groupId>org.slf4j</groupId>
				</exclusion>
			</exclusions>
		</dependency>

		<!-- Zookeeper客户端 -->
		<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.4</version>
			<exclusions>
				<exclusion>
					<artifactId>log4j</artifactId>
					<groupId>log4j</groupId>
				</exclusion>
			</exclusions>
		</dependency>

具体代码参照:http://www.cnblogs.com/lilixin/p/5775877.html

这里给出关键配置:

kafka.properties

zookeeper.connect=192.168.1.190:2181,192.168.1.190:2182,192.168.1.190:2183
#zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181
metadata.broker.list=192.168.1.190:9092,192.168.1.190:9093
#metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092
 
#zookeeper.connect.timeout=15000
#zookeeper.session.timeout.ms=15000
#zookeeper.sync.time.ms=20000
#auto.commit.interval.ms=20000
#auto.offset.reset=smallest
#serializer.class=kafka.serializer.StringEncoder
#producer.type=async
#queue.buffering.max.ms=6000
 
test.group.id=huhui
kafka.test.topics=huhui

applicationContext.xml 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
	xsi:schemaLocation="
        http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/mvc 
        http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
        http://www.springframework.org/schema/aop
        http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
        http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd"
	default-autowire="byName">

	<!-- 这个是加载给spring 用的. -->
	<bean id="propertyConfigurer"
		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="locations">
			<list>
				<value>classpath:kafka.properties</value>
			</list>
		</property>
	</bean>
	<!-- 这个是用来在代码中注入用的. -->
	<bean id="configProperties"
		class="org.springframework.beans.factory.config.PropertiesFactoryBean">
		<property name="locations">
			<list>
				<value>classpath:kafka.properties</value>
			</list>
		</property>
	</bean>
	
	<!-- kafka -->
	<import resource="applicationContext-kafka-producer.xml"/>
	<import resource="applicationContext-kafka-receiver.xml"/>

</beans>

applicationContext-kafka-producer.xml 

<bean id="topProducer" class="top.lilixin.TopProducer">
         <constructor-arg index="0" value="${metadata.broker.list}" />
    </bean>

applicationContext-kafka-receiver.xml 

<!-- 定义消息处理器 -->
	<bean id="testConsumer" class="top.lilixin.TestConsumer"></bean>

	<!-- 定义收信人 receiver -->
	<bean id="topReceiver" class="top.lilixin.TopReceiver">

		<constructor-arg index="0" value="${zookeeper.connect}" /><!-- 
			_zookeeper集群地址,如: zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181_ -->

		<constructor-arg index="1" value="${test.group.id}" /><!-- 
			_消费者所属组id字符串 ,如:vko_group_article_read_count_ -->

		<constructor-arg index="2" value="${kafka.test.topics}" /><!-- 
			_要消费的消息主题,如:vko_group_ -->

		<constructor-arg index="3" ref="testConsumer" /> <!--_上面定义的消息处理器_ -->
	</bean>

项目原代码中TopReceiver.java有点小问题:服务关闭后,重启服务会重复读取消息。

原代码:

// 目前每个topic都是2个分区
	     topicCountMap.put(topic,2);
	     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap);
	        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
	        for (final KafkaStream<byte[], byte[]> stream : streams) {
	        	new Thread(){
	        		public void run(){
	        			ConsumerIterator<byte[], byte[]> it = stream.iterator();
	    		        while(it.hasNext()){
	    		        	String msg = new String(it.next().message());
	    		        	try{
	    		        	 topConsumer.dealMsg(msg);
	    		        	}catch(Exception e){
	    		        		log.error("kafka vkoConsumer topic:{} 收到消息:{} 消费异常 xxxxxxxxxxxxxxxxxx", topic, msg,e);
	    		        	}
	    		        	log.info("kafka vkoConsumer topic:{} 收到消息:{}", topic, msg);
	    		        }
	        		}
	        	}.start();
	        	log.info("kafka vkoConsumer 启动完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect);
	        }

首先要知道的是,High Level Consumer在ZooKeeper上保存最新的offset(从指定的分区中读取)。这个offset基于consumer group名存储。

Consumer group名在Kafka集群上是全局性的,在启动新的consumer group的时候要小心集群上没有关闭的consumer。当一个consumer线程启动了,Kafka会将它加入到相同的topic下的相同consumer group里,并且触发重新分配。在重新分配时,Kafka将partition分配给consumer,有可能会移动一个partition给另一个consumer。如果老的、新的处理逻辑同时存在,有可能一些消息传递到了老的consumer上。

使用High Level Consumer,它应该是多线程的。消费者线程的数量跟tipic的partition数量有关,它们之间有一些特定的规则:

  • 如果线程数量大于主题的分区数量,一些线程将得不到任何消息
  • 如果分区数大于线程数,一些线程将得到多个分区的消息
  • 如果一个线程处理多个分区的消息,它接收到消息的顺序是不能保证的。比如,先从分区10获取了5条消息,从分区11获取了6条消息,然后从分区10获取了5条,紧接着又从分区10获取了5条,虽然分区11还有消息。
  • 添加更多了同consumer group的consumer将触发Kafka重新分配,某个分区本来分配给a线程的,从新分配后,有可能分配给了b线程。

Kafka不会再每次读取消息后马上更新zookeeper上的offset,而是等待一段时间。由于这种延迟,有可能消费者读取了一条消息,但没有更新offset。所以,当客户端关闭或崩溃后,从新启动时有些消息重复读取了。另外,broker宕机或其他原因导致更换了partition的leader,也会导致消息重复读取。

为了避免这种问题,你应该提供一个平滑的关闭方式,而不是使用kill -9

修改后:

private static final int THREAD_AMOUNT = 2;
……

// 目前每个topic都是2个分区
	     topicCountMap.put(topic,THREAD_AMOUNT);
	     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap);
	        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
	        ExecutorService executor = Executors.newFixedThreadPool(THREAD_AMOUNT);
	        //使用ExecutorService来调度线程
	        for (int i = 0; i < streams.size(); i++) {
                KafkaStream<byte[], byte[]> kafkaStream = streams.get(i);
                executor.submit(new HanldMessageThread(kafkaStream, i));
	        	
	        	
	        	log.info("kafka vkoConsumer 启动完成:groupId:"+groupId+",topic:"+topic+",zookeeperConnect:"+zookeeperConnect);
	        }


//关闭consumer
	        try {
	            Thread.sleep(10000);
	        } catch (InterruptedException e) {
	            e.printStackTrace();
	        }
	        if (cc != null) {
	        	cc.shutdown();
	        }
	        if (executor != null) {
	            executor.shutdown();
	        }
	        try {
	            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
	            	log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
	            }
	        } catch (InterruptedException e) {
	        	log.info("Interrupted during shutdown, exiting uncleanly");
	        }

……
/**
	 * 具体处理message的线程
	 * @author Administrator
	 *
	 */
	class HanldMessageThread implements Runnable {
	 
	    private KafkaStream<byte[], byte[]> kafkaStream = null;
	    private int num = 0;
	     
	    public HanldMessageThread(KafkaStream<byte[], byte[]> kafkaStream, int num) {
	        super();
	        this.kafkaStream = kafkaStream;
	        this.num = num;
	    }
	 
	    public void run() {
	        ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
	        while(iterator.hasNext()) {
	            String message = new String(iterator.next().message());
	            System.out.println("Thread no: " + num + ", message: " + message);
	        }
	    }
	     
	}

 具体参照:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

补充:非常不错的Kafka教程

http://orchome.com/kafka/index

相关推荐