Kafka集群部署

集群规划

Zookeeper集群共三台服务器,分别为:sto1、sto2、sto3。

Kafka集群共三台服务器,分别为:sto1、sto2、sto3。

 

1、Zookeeper集群准备

kafka是一个分布式消息队列,需要依赖ZooKeeper,请先安装好zk集群。

Zookeeper集群安装步骤略。

 2、安装Kafka

下载压缩包(官网地址:http://kafka.apache.org/downloads.html

 解压:

tar zxvf kafka_2.10-0.9.0.1.tgz -C /opt/sgb

mv kafka_2.10-0.9.0.1/ kafka

修改配置文件:config/server.properties

 

broker.id=0   #参考zookeeper的myid  ,每个id不能一样,0,1,2集群中唯一标识id,0、1、2、3依次增长(broker即Kafka集群中的一台服务器)
zookeeper.connect=sto1:2181,sto2:2181,sto3:2181  #zookeeper集群地址
代码分发
scp -r /opt/sgb/kafka/ sto2:/opt
scp -r /opt/sgb/kafka/ sto3:/opt
修改sto2、sto3上Kafka配置文件中的broker.id(分别在sto2、3服务器上执行以下命令修改broker.id)
sed -i -e 's/broker.id=.*/broker.id=1/' /opt/sgb/kafka/config/server.properties
sed -i -e 's/broker.id=.*/broker.id=2/' /opt/sgb/kafka/config/server.properties

3、启动Kafka集群

A、启动Zookeeper集群。

B、启动Kafka集群。

分别在三台服务器上执行以下命令启动:

bin/kafka-server-start.sh config/server.properties

4、测试

创建topic:
bin/kafka-topics.sh --zookeeper sto1:2181,sto2:2181,sto3:2181 --create --replication-factor 2 --partitions 3 --topic test
查看topic列表
bin/kafka-topics.sh --zookeeper sto1:2181,sto2:2181,sto3:2181 --list
查看“test”topic描述
bin/kafka-topics.sh --zookeeper sto1:2181,sto2:2181,sto3:2181 --describe --topic test
创建生产者
bin/kafka-console-producer.sh --broker-list sto1:9092,sto2:9092,sto3:9092 --topic test
创建消费者
bin/kafka-console-consumer.sh --zookeeper sto1:2181,sto2:2181,sto3:2181 --from-beginning --topic test

 ----------------------------------------------------------------------------------------------------------

消費者
package bhz.kafka.example;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class KafkaConsumer {
	
	public static final String topic = "test";

	public static void main(String[] args) {
    	
        Properties props = new Properties();
        props.put("zookeeper.connect", "sto1:2181,sto2:2181,sto3:2181");
        //group 代表一个消费组
        props.put("group.id", "group1");
        //zk连接超时
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        //序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");

        ConsumerConfig config = new ConsumerConfig(props);
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);

        //设置订阅主题。
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
        
        KafkaStream<String, String> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        
        while (it.hasNext())
            System.out.println(it.next().message());
		}
}
生產者
package bhz.kafka.example;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

public class KafkaProducer {  
	  
	public static final String topic = "test";
	
    public static void main(String[] args) throws Exception {  
        Properties properties = new Properties();  
        properties.put("zookeeper.connect", "sto1:2181,sto2:2181,sto3:2181");	//声明zk  
        properties.put("serializer.class", StringEncoder.class.getName());  
        properties.put("metadata.broker.list", "sto1:9092");	// 声明kafka broker 
        properties.put("request.required.acks", "1");
        Producer producer = new Producer<Integer, String>(new ProducerConfig(properties));
        for(int i=0; i < 10; i++){
        	producer.send(new KeyedMessage<Integer, String>(topic, "hello kafka" + i)); 
        	System.out.println("send message: " + "hello kafka" + i);
        	TimeUnit.SECONDS.sleep(1);  
        }
        producer.close();
    }  
       
}

相关推荐