Kafka 在zookeeper中的节点结构、日志结构

登录zookeeper

zkCli.sh -server centos1:2181

创建topics mytopic

kafka-topics.sh --create --zookeeper centos1:2181,centos2:2181,centos3:2181 --replication-factor 3 --partitions 2 --topic mytopic

zookeeper节点结构

/controller data={"version":1,"brokerid":2,"timestamp":"1495002238024"} //id=2的broker是leader

/controller_epoch data=1

/brokers

/brokers/ids //实时维护active的brokers

/brokers/ids/0

/brokers/ids/1

/brokers/ids/2

/brokers/topics

/brokers/topics/mytopic/partitions/0/state data={"controller_epoch":7,"leader":1,"version":1,"leader_epoch":0,"isr":[1,0,2]} //其中leader指的是该partition的leader,每个partition都有一个leader。"isr":[1,0,2]表示该partition有三个replication,分别位于1,0,2三个broker上。leader维护了其它副本的同步信息。

/brokers/topics/mytopic/partitions/1/state data={"controller_epoch":7,"leader":2,"version":1,"leader_epoch":0,"isr":[2,1,0]}

/brokers/seqid

/admin/delete_topics

/isr_change_notification

/consumers/

/consumers/console-consumer-24372 data=

/consumers/console-consumer-24372/ids data=

/consumers/console-consumer-24372/ids/console-consumer-24372_centos1-1495075711403-999aec1a data={"version":1,"subscription":{"mytopic":1},"pattern":"white_list","timestamp":"1495075711460"}

/consumers/console-consumer-24372/owners data=null

/consumers/console-consumer-24372/owners/mytopic data=null

/consumers/console-consumer-24372/owners/mytopic/0 data=console-consumer-24372_centos1-1495075711403-999aec1a-0

/consumers/console-consumer-24372/owners/mytopic/1 data=console-consumer-24372_centos1-1495075711403-999aec1a-0

/consumers/console-consumer-24372/offsets data=null

/consumers/console-consumer-24372/offsets/mytopic data=null

/consumers/console-consumer-24372/offsets/mytopic/0 data=153

/consumers/console-consumer-24372/offsets/mytopic/1 data=582 //console-consumer-24372 是某个控制台consumer的group name,582是该consumer目前消费的mytopic中分区1中的消息的偏移量。可以直接在zookeeper中修改这个值,从而让该consumer从这个值(偏移量)开始读消息。

/config

/config/changes

/config/clients

/config/topics

kafka目录结构

./.lock

./meta.properties

./cleaner-offset-checkpoint

./replication-offset-checkpoint

./recovery-point-offset-checkpoint

./mytopic-0 //命名方式:topic+分区ID

./mytopic-0/00000000000000000000.index

./mytopic-0/00000000000000000000.timeindex

./mytopic-0/00000000000000000000.log //存放消息的地方

./mytopic-1

./mytopic-1/00000000000000000000.index

./mytopic-1/00000000000000000000.timeindex

./mytopic-1/00000000000000000000.log //存放消息的地方

Kafka 副本机制:

1. 每个分区存放n个副本,可承受n-1个节点失效。

2. 这n个副本中有一个是leader,它同时维护者所有副本的同步状态。

3. 如果leader失效,会通知producer,然后producer将消息重新发送给新的leader。

4. 选择新leader的方法是:所有follower在zookeeper中注册自己,最先注册的是leader,其它是follower。

5. Kafka支持的副本机制有:

同步机制: producer从zookeeper中找到leader,向leader发送消息,消息写入leader本地log。follower从leader中pull消息,每个follower将消息写入本地log,向leader发送确认回执。leader收到follower的确认回执后再想producer发送确认回执。 在consumer端,所有的消息是从leader中pull的。

异步机制:与同步机制不同的是一旦leader向log写入message完成就会向producer发送确认回执。所以这种机制不保证向失效的follower写入成功。

consumer group 与 partition

1. 监听同一个topic的多个consumer,可以属于一个group。同属一个group的多个consumer不会重复接收消息。如果要重复接收所有消息需要配不同的group。

2. 假设partition的数量是m, 同属一个group的consumer数量是n:

a. m=n, 平均为每个consumer分配一个partition

b. m>n, 每个consumer都能分配一个partition,有些consumer会分配到多个partition

c. m<n, 只有m个consumer都能分配一个partition,n-m个consumer接收不到消息。此时如果开启新的consumer,某个旧的consumer将读不到消息。

consumer的数量可以随时调整,不会漏掉消息。

命令:bin/kafka-console-consumer.sh --bootstrap-server centos1:9092 --topic mytopic --consumer-property group.id=group1

3. 可以指定consumer只接收某个partition的消息

命令:bin/kafka-console-consumer.sh --bootstrap-server centos1:9092 --topic mytopic --consumer-property group.id=group1 --partition 0

4.下面的zookeeper节点信息是被group共享的(新版Kafka可能没有把offset存到zookeeper):

/consumers/mygroup/offsets/mytopic/0 data=153

/consumers/mygroup/offsets/mytopic/1 data=582

153、582记录的是mygroup在mytopic中的分区0和分区1分别读取到的偏移量

Kafka 在zookeeper中的节点结构、日志结构

相关推荐