消息系统kafka介绍

http://dongxicheng.org/search-engine/kafka/

kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.

    我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.

    其中kafka为0.8V,zookeeper为3.4.5V

一.Zookeeper集群构建

    我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.

    1) zk-0

    调整配置文件:

./zkServer.sh start  

    2) zk-1

    调整配置文件(其他配置和zk-0一只):

./zkServer.sh start  

    3) zk-2

    调整配置文件(其他配置和zk-0一只):

./zkServer.sh start  

  

二. Kafka集群构建

    因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.

    1) kafka-0

    在config目录下修改配置文件为:

broker.id=0  
port=9092  
num.network.threads=2  
num.io.threads=2  
socket.send.buffer.bytes=1048576  
socket.receive.buffer.bytes=1048576  
socket.request.max.bytes=104857600  
log.dir=./logs  
num.partitions=2  
log.flush.interval.messages=10000  
log.flush.interval.ms=1000  
log.retention.hours=168  
#log.retention.bytes=1073741824  
log.segment.bytes=536870912  
num.replica.fetchers=2  
log.cleanup.interval.mins=10  
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
zookeeper.connection.timeout.ms=1000000  
kafka.metrics.polling.interval.secs=5  
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter  
kafka.csv.metrics.dir=/tmp/kafka_metrics  
kafka.csv.metrics.reporter.enabled=false  

    因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。

> cd kafka-0  
> ./sbt update  
> ./sbt package  
> ./sbt assembly-package-dependency   

    其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:

> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &  

    因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.

    2) kafka-1

broker.id=1  
port=9093  
##其他配置和kafka-0保持一致  

    然后和kafka-0一样执行打包命令,然后启动此broker.

> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &  

    到目前为止环境已经OK了,那我们就开始展示编程实例吧。

三.项目准备

    项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
    <modelVersion>4.0.0</modelVersion>  
    <groupId>com.test</groupId>  
    <artifactId>test-kafka</artifactId>  
    <packaging>jar</packaging>  
  
    <name>test-kafka</name>  
    <url>http://maven.apache.org</url>  
    <version>1.0.0</version>  
    <dependencies>  
        <dependency>  
            <groupId>log4j</groupId>  
            <artifactId>log4j</artifactId>  
            <version>1.2.14</version>  
        </dependency>  
        <dependency>  
            <groupId>org.apache.kafka</groupId>  
            <artifactId>kafka_2.8.0</artifactId>  
            <version>0.8.0-beta1</version>  
            <exclusions>  
                <exclusion>  
                    <groupId>log4j</groupId>  
                    <artifactId>log4j</artifactId>  
                </exclusion>  
            </exclusions>  
        </dependency>  
        <dependency>  
            <groupId>org.scala-lang</groupId>  
            <artifactId>scala-library</artifactId>  
            <version>2.8.1</version>  
        </dependency>  
        <dependency>  
            <groupId>com.yammer.metrics</groupId>  
            <artifactId>metrics-core</artifactId>  
            <version>2.2.0</version>  
        </dependency>  
        <dependency>  
            <groupId>com.101tec</groupId>  
            <artifactId>zkclient</artifactId>  
            <version>0.3</version>  
        </dependency>  
    </dependencies>  
    <build>  
        <finalName>test-kafka-1.0</finalName>  
        <resources>  
            <resource>  
                <directory>src/main/resources</directory>  
                <filtering>true</filtering>  
            </resource>  
        </resources>  
        <plugins>  
            <plugin>  
                <artifactId>maven-compiler-plugin</artifactId>  
                <version>2.3.2</version>  
                <configuration>  
                    <source>1.5</source>  
                    <target>1.5</target>  
                    <encoding>gb2312</encoding>  
                </configuration>  
            </plugin>  
            <plugin>  
                <artifactId>maven-resources-plugin</artifactId>  
                <version>2.2</version>  
                <configuration>  
                    <encoding>gbk</encoding>  
                </configuration>  
            </plugin>  
        </plugins>  
    </build>  
</project>  

四.Producer端代码

    1) producer.properties文件:此文件放在/resources目录下

#partitioner.class=  
metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093  
##,127.0.0.1:9093  
producer.type=sync  
compression.codec=0  
serializer.class=kafka.serializer.StringEncoder  
##在producer.type=async时有效  
#batch.num.messages=100  

    2) LogProducer.java代码样例

package com.test.kafka;  
  
import java.util.ArrayList;  
import java.util.Collection;  
import java.util.List;  
import java.util.Properties;  
  
import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  
public class LogProducer {  
  
    private Producer<String,String> inner;  
    public LogProducer() throws Exception{  
        Properties properties = new Properties();  
        properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));  
        ProducerConfig config = new ProducerConfig(properties);  
        inner = new Producer<String, String>(config);  
    }  
  
      
    public void send(String topicName,String message) {  
        if(topicName == null || message == null){  
            return;  
        }  
        KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);  
        inner.send(km);  
    }  
      
    public void send(String topicName,Collection<String> messages) {  
        if(topicName == null || messages == null){  
            return;  
        }  
        if(messages.isEmpty()){  
            return;  
        }  
        List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();  
        for(String entry : messages){  
            KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);  
            kms.add(km);  
        }  
        inner.send(kms);  
    }  
      
    public void close(){  
        inner.close();  
    }  
      
    /** 
     * @param args 
     */  
    public static void main(String[] args) {  
        LogProducer producer = null;  
        try{  
            producer = new LogProducer();  
            int i=0;  
            while(true){  
                producer.send("test-topic", "this is a sample" + i);  
                i++;  
                Thread.sleep(2000);  
            }  
        }catch(Exception e){  
            e.printStackTrace();  
        }finally{  
            if(producer != null){  
                producer.close();  
            }  
        }  
  
    }  
  
}  

五.Consumer端

     1) consumer.properties:文件位于/resources目录下

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
##,127.0.0.1:2182,127.0.0.1:2183  
# timeout in ms for connecting to zookeeper  
zookeeper.connectiontimeout.ms=1000000  
#consumer group id  
group.id=test-group  
#consumer timeout  
#consumer.timeout.ms=5000  
auto.commit.enable=true  
auto.commit.interval.ms=60000  

    2) LogConsumer.java代码样例

package com.test.kafka;  
  
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.Properties;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
  
import kafka.consumer.Consumer;  
import kafka.consumer.ConsumerConfig;  
import kafka.consumer.ConsumerIterator;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector;  
import kafka.message.MessageAndMetadata;  
public class LogConsumer {  
  
    private ConsumerConfig config;  
    private String topic;  
    private int partitionsNum;  
    private MessageExecutor executor;  
    private ConsumerConnector connector;  
    private ExecutorService threadPool;  
    public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{  
        Properties properties = new Properties();  
        properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties"));  
        config = new ConsumerConfig(properties);  
        this.topic = topic;  
        this.partitionsNum = partitionsNum;  
        this.executor = executor;  
    }  
      
    public void start() throws Exception{  
        connector = Consumer.createJavaConsumerConnector(config);  
        Map<String,Integer> topics = new HashMap<String,Integer>();  
        topics.put(topic, partitionsNum);  
        Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);  
        List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);  
        threadPool = Executors.newFixedThreadPool(partitionsNum);  
        for(KafkaStream<byte[], byte[]> partition : partitions){  
            threadPool.execute(new MessageRunner(partition));  
        }   
    }  
  
          
    public void close(){  
        try{  
            threadPool.shutdownNow();  
        }catch(Exception e){  
            //  
        }finally{  
            connector.shutdown();  
        }  
          
    }  
      
    class MessageRunner implements Runnable{  
        private KafkaStream<byte[], byte[]> partition;  
          
        MessageRunner(KafkaStream<byte[], byte[]> partition) {  
            this.partition = partition;  
        }  
          
        public void run(){  
            ConsumerIterator<byte[], byte[]> it = partition.iterator();  
            while(it.hasNext()){  
                                //connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用  
                MessageAndMetadata<byte[],byte[]> item = it.next();  
                System.out.println("partiton:" + item.partition());  
                System.out.println("offset:" + item.offset());  
                executor.execute(new String(item.message()));//UTF-8,注意异常  
            }  
        }  
    }  
      
    interface MessageExecutor {  
          
        public void execute(String message);  
    }  
      
    /** 
     * @param args 
     */  
    public static void main(String[] args) {  
        LogConsumer consumer = null;  
        try{  
            MessageExecutor executor = new MessageExecutor() {  
                  
                public void execute(String message) {  
                    System.out.println(message);  
                      
                }  
            };  
            consumer = new LogConsumer("test-topic", 2, executor);  
            consumer.start();  
        }catch(Exception e){  
            e.printStackTrace();  
        }finally{  
//          if(consumer != null){  
//              consumer.close();  
//          }  
        }  
  
    }  
  
}  

    需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.

    在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。

  • test-kafka.zip (14.7 KB)
  • 下载次数: 22

相关推荐