Kafka集群搭建
1、环境CentOS6.8 + jdk1.8.0_151+zookeeper-3.4.11
jdk安装请参考CentOS 搭建JDK环境
Zookeeper安装请参考Zookeeper集群搭建
2、下载 kafka_2.12-0.11.0.2.tgz,下载地址: http://kafka.apache.org/downloads
3、安装
3.1、上传kafka_2.12-0.11.0.2.tgz至/usr/dev/kafka安装目录下
3.2、将kafka_2.12-0.11.0.2.tgz解压至kafka目录
tar -zxvf kafka_2.12-1.0.1.tgz
3.3、修改配置文件
进入到config目录
cd kafka_2.12-1.0.1/config/
修改server.properties配置文件
主要修改:server.properties 这个文件即可,我们可以发现在目录下:
有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群
vi server.properties
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样 listeners=PLAINTEXT://10.202.107.207:9092 num.network.threads=3 #这个是borker进行网络处理的线程数 num.io.threads=8 #这个是borker进行I/O处理的线程数 log.dirs=/usr/dev/kafka/kafka_2.12-1.0.1/logs#消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能 socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘 socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 num.partitions=1 #默认的分区数,一个topic默认1个分区数 log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务 replica.fetch.max.bytes=5242880 #取消息的最大直接数 log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除 log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能 zookeeper.connect=10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181 #设置zookeeper的连接端口
上面是参数的解释,实际的修改项为(修改如下三项即可):
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样 listeners=PLAINTEXT://10.202.107.207:9092 zookeeper.connect=10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181 #设置zookeeper的连接端口
将上述修改后的kafka附件包拷贝到其他虚拟机下,修改 server.properties配置文件,修改完配置文件完成集群安装
4、启动并测试
4.1、启动
进入bin目录: cd /usr/dev/kafka/kafka_2.12-1.0.1/bin/ 启动kafka服务: 非守护程序方式启动: ./kafka-server-start.sh /usr/dev/kafka/kafka_2.12-1.0.1/config/server.properties 守护程序方式启动:nohup & nohup ./kafka-server-start.sh /usr/dev/kafka/kafka_2.12-1.0.1/config/server.properties &
4.2、检查服务是否启动
jps
4.3
1、kafka架构 生产者、消费者(组)、zookeeper、broker(组)、主题-->分区-->副本因子 2、主题:创建、修改、删除(标记-设置特殊的属性-server.properties) 3、分区:从主题下进行分区 单独的一个分区内的消息是有序且不可修改 但是多个分区之间的数据是无序 消息怎么知道进入特定的分区?--消息的组成部分key-value 4、复本 复本因子数应当小于等于可用的broker数 leader和follower leader与外界的读写。然后follower云采用拉的方式与leader同步 5、消费组 同一消费组中的消费者应该小于等于所操作主题下的分区数。如果小于的话,最好成位数 6、同步与异步 同步:消息发送给kafka之后,在些等待返回结果 异步:消息先发送到阻塞队列中,通过轮询的方式,将阻塞队列中的消费发送给kafka 回调回来的消息,是由用户(生产者)自己来决定 7、kafka与zookeeper的节点说明 多个broker之间会存在一个controller broker的角色 10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181 进入bin目录: cd /usr/dev/kafka/kafka_2.12-1.0.1/bin/ 启动kafka服务: 非守护程序方式启动: ./kafka-server-start.sh /usr/dev/kafka/kafka_2.12-1.0.1/config/server.properties 守护程序方式启动:nohup & nohup ./kafka-server-start.sh /usr/dev/kafka/kafka_2.12-1.0.1/config/server.properties & 停止kafka服务: ./kafka-server-stop.sh 创建Topic ./kafka-topics.sh --create --zookeeper 10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181 --topic mytopic --partitions 3 --replication-factor 3 查看Topic信息(会列出分区数、副本数、副本leader节点、副本节点、活着的副本节点) ./kafka-topics.sh --describe --zookeeper 10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181 --topic mytopic 列出所有topic: ./kafka-topics.sh --list --zookeeper 10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181 将IP为207的kafka停点,测试集群(kill) 创建消费者 ./kafka-console-consumer.sh --zookeeper 10.202.107.208:2181,10.202.107.207:2181,10.202.107.115:2181 --topic mytopic 创建生产者 ./kafka-console-producer.sh --broker-list IP:9092,IP:9092,IP:9092 --topic mytopic 创建topic: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 查看topic信息(包括分区、副本情况等): kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic ,会列出分区数、副本数、副本leader节点、副本节点、活着的副本节点 往某topic生产消息: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 从某topic消费消息: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning (默认用一个线程消费指定topic的所有分区的数据) 删除某个Kafka groupid:连接Zookeeper后用rmr命令,如删除名为JSI的消费组: rmr /consumers/JSI