kafka 偏移量相关接口
kafka 0.9 之前的版本偏移量信息是通过 zookeeper 管理的;为了避免对 zookeeper 的过度依赖,每次从 kafka 上读取 topic 偏移量信息,连接消耗还是比较大的,从 kafka 0.9 开始,kafka 已接管了偏移量信息管理功能,并将各消费组的偏移量写入了 __consumer_offsets 主题(默认50个分区);
api 方式获取某消费组的消费偏移量信息:
通过 ConsumerGroupCommand 作为入口,然后调用 ConsumerGroupService接口,该接口有如下两个实现:
- KafkaConsumerGroupService: 0.11 之前的api 需要添加参数:--new-consumer,从0.11
开始,默认先使用的是 kafka 实现; - ZkConsumerGroupService:0.11 之前默认实现方式;
该接口如下方法:
- 查看所有的消费组方法:listGroups
- 描述消费组消费信息:describeGroup
val listStrZk = "--zookeeper 192.168.xx.xx:2181 --list" //列出所有的消费组 val listStr = "--bootstrap-server 192.168.xx.xx:9092 --list --new-consumer" val listArgs = listStr.split(" ") val describeStrzk = "--zookeeper 192.168.xx.xx:2181 --describe --group 3" // 描述消费组信息 val describeStr = "--bootstrap-server 192.168.xx.xx:9092 --describe --group 6 --new-consumer" val describeArgs = describeStr.split(" ") val args = Array[String](topic,bootstrap,group,describe) // ConsumerGroupCommand.main(listArgs) ConsumerGroupCommand.main(describeArgs) /* 主题 分区 消费位移 最高日志位移 消费滞后offset * TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 2282954 2743178 460224 - - - test 1 1500 860230 858730 - - - test 4 500 860231 859731 - - - test 3 0 860228 860228 - - - test 2 0 860226 860226 - - - * */
- LAG:消费滞后
- LEO:最高日志位移
- Lag = HW - ConsumerOffset
相关推荐
doITwhat 2015-07-09
dfBeautifulLive 2019-10-26
GUAOSHITAIDU 2018-01-19
forrestou 2019-09-05
Kshine0 2015-07-09
ElementW 2018-01-19
我只是个程序员 2015-06-29
沉着前进 2014-07-21
forrestou 2019-06-25
哈嘿Blog 2016-09-08
zzpdljd 2016-05-05
fkuevip 2016-04-05
前端档案 2015-04-14
PHP100 2019-03-27
BitTigerio 2018-01-31
编程爱好者联盟 2017-02-08