kafka(二)—Kafka 的安装和使用(Quickstart)
Step1:下载Kafka
去官网下载地址下载最新版本并解压:
tar -xzf kafka_2.11-1.1.0.tgz cd kafka_2.11-1.1.0
Step2:配置
- ZooKeeper的配置,参考前面的ZooKeeper的文章,此处不详细做说明。
- Kafka的配置
在Kafka的home目录下,vim config/server.properties
命令,最主要的是设置ZooKeeper连接的地址和接口:
zookeeper.connect=localhost:2181
Step3:启动服务
Kafka用到了ZooKeeper,所以要先启动ZooKeepr。
第一种方式(推荐)是启动独立的ZooKeeper程序,在$ZOOKEEPER_HOME/bin
目录下,执行./zkServer.sh start
。
第二种方式是启动Kafka内置的一个单节点ZooKeeper实例,方法是在$KAFKA_HOME
目录下执行:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动了ZooKeeper之后,就可以启动Kafka服务了,在$KAFKA_HOME
目录下执行:
bin/kafka-server-start.sh config/server.properties
如果要以守护线程启动,则执行:
bin/kafka-server-start.sh -daemon config/server.properties
Step4:创建一个topic
创建一个叫做testTopic
的topic,并且它只有一个分区,一个副本。
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --topic testTopic --zookeeper localhost:2181 --replication-factor 1 --partitions 1 Created topic "testTopic".
partitions参数:表示主题的分区(partition)的数量。
replication-factor参数:表示每个partition的备份数量。
zookeeper参数:表示zookeeper的主机地址和客户端连接端口。如果zookeeper是集群,这里也可以只写一个节点就行。
列出Kafka上的所有topic:
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181 myTest testTopic
上面说明我的kafka已经创建了两个主题:myTest和testTopic。
Step5:发送消息
Kafka 使用一个简单的命令行客户端producer,可以从文件中或者从标准输入中读取消息并发送到Kafka服务端集群。默认的每一行都将作为一条独立的消息
运行producer并在控制台中输一些消息,这些消息将被发送到服务端:
[root@myServer kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic >This is a message >This is another message
Step6:启动consumer
Kafka也有一个命令行consumer可以读取消息并输出到标准输出:
[root@myServer kafka_2.11-1.0.0]#bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning This is a message This is another message
如果你在一个终端中运行consumer命令行,另一个终端中运行producer命令行,就可以在一个终端输入消息,另一个终端读取消息。
这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。
Step7:搭建一个多个broker的集群
刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点也都是在本机上的:
首先为每个节点编写配置文件:
> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties
在拷贝出的新文件中修改如下3个参数:
config/server-1.properties:
broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2
broker.id在集群中唯一的标注一个节点,因为在同一个机器上,所以必须制定不同的端口和日志文件,避免数据被覆盖。
刚才已经启动可Zookeeper和一个节点,现在启动另外两个Kafka节点:
> bin/kafka-server-start.sh -daemon config/server-1.properties ... > bin/kafka-server-start.sh -daemon config/server-2.properties ...
查看进程:
[root@server01 kafka_2.11-1.0.0]# jps -lm 9857 sun.tools.jps.Jps -lm 1737 org.apache.zookeeper.server.quorum.QuorumPeerMain /home/hadoop/app/zookeeper-3.4.11/bin/../conf/zoo.cfg 9517 kafka.Kafka config/server-2.properties 5566 kafka.Kafka ../config/server.properties 9199 kafka.Kafka config/server-1.properties
创建一个拥有3个副本的topic:
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic Created topic "my-replicated-topic".
现在我们搭建了一个集群,怎么知道每个节点的信息呢?运行“"describe topics”命令就可以了:
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: my-replicated-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
下面解释一下这些输出。第一行是对所有分区的一个描述,然后每个分区都会对应一行,我们创建topic的时候指明是3个分区,所以有3行。
leader:负责处理消息的读和写的broker,leader是从所有节点中随机选择的.
replicas:列出了所有的副本节点,不管节点是否在服务中.
isr:是正在服务中的节点.
向topic发送消息:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2 ^C
消费这些消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 2 my test message 1 ^C
这里接收到的顺序不一致时因为发送到了不同的partiton所致。
测试一下kafka集群的容错能力。对于partition=1,其leader是brokerid=0的节点,现在我们kill掉它:
> ps -aux | grep server.properties root 32658 0.5 17.4 4305700 329028 pts/0 Sl+ 10:06 0:15 /opt/jdk1.8.0_121/bin/java... > kill -9 32658
再次查询该topic,可以看到另外一个brokerid=1的节点被选做了leader,brokeid=0的节点不再出现在 in-sync 副本列表中:
[root@server01 kafka_2.11-1.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2 Topic: my-replicated-topic Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1 Topic: my-replicated-topic Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
虽然最初负责写消息的leader down掉了,但之前的消息还是可以消费的,不过bootstrap-server参数要选择没有down掉的节点:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic my-replicated-topic ... my test message 2 my test message 1 ^C