kafka集群搭建

一.初始化环境

1.先安装好jdk和zookeeper

2.并启动zookeeper集群

3.准备三台服务器搭建kafka集群环境

二.下载kafka安装包

http://archive.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz

三.搭建环境

1.把压缩包上传到三台服务器同一路径下

2.修改配置文件 kafka/config/server.properties

 三台服务器创建logs目录

  第一台服务器 ip01:

broker.id=1                             #节点编号,每个服务器不一样
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/kafka_2.11-0.10.0.0/logs  #日志目录要创建出来
num.partitions=2                        #分区数
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=ip01:2181,ip02:2181,ip03:2181  #zookeeper集群地址
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true                    #可以删除topic
host.name=ip01                              #ip地址

第二台服务器 ip02:

broker.id=2
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/kafka_2.11-0.10.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=ip01:2181,ip02:2181,ip03:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=ip02

第三台服务器 ip03:

broker.id=3
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/kafka_2.11-0.10.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=ip01:2181,ip02:2181,ip03:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=ip03

四.启动集群

三台服务后台启动命令:

nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

查看是否启动成功:jps

五.测试

创建一个Topic:

bin/kafka-topics.sh --create --zookeeper ip01:2181 --replication-factor 2 -- partitions 3 --topic test

生产者生产数据:

bin/kafka-console-producer.sh --broker-list ip01:9092,ip02:9092,ip03:9092 --topic test

消费者消费数据:

bin/kafka-console-consumer.sh --from-beginning --topic test  --zookeeper ip01:2181,ip02:2181,ip03:2181

相关推荐