kafka学习总结012 --- 数据消费相关流程

1、消费者组

kafka提供的一种可扩展可容错消费机制,某个topic的分区数据只能被组内的一个消费者消费,注:当指定了自动提交(enable.auto.commit=true)时,必须显式的指定消费者组ID(group.id)

2、消费位置和offset管理

消费者需要记录消费了多少数据,即消费位置;consumer创建时可以指定offset提交方式:手动or自动(enable.auto.commit),提交的offset在老的kafka版本中被保存在zookeeper中,新的版本中,提供了一种新的方案:保存在指定的topic中(__consumer_offsets)

保存的格式为:groupID + topic-partition + offset;

__consumer_offsets默认的配置是:50个分区,1个副本;可以通过修改server.properties中的offsets.topic.replication.factor配置选项,修改副本数

3、消费者分区分配策略

同一个组内的消费者,如何确定消费指定topic哪个分区的数据呢?kafka提供了三种消费分区策略:

(1)RangeAssigner(默认策略)

         消费者按照字典序排序,然后:n=分区数/消费者数,m=分区数%消费者数,那么前m个消费者能够分配到n+1个分区,其余的消费者只能分配到n个分区

(2)RoundRobinAssigner

         轮询策略,不多说。。。

(3)StickyAssignor

        如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor策略如同其名称中的“sticky”一样,让分配策略具备          一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。

4、rebalance

rebalance用来规定某个consumer group下的cunsumer如何分配topic分区,即执行上面的分区策略

(1)什么时候触发rebalance

         组内加入新成员

         consumer离开组(主动离开,或者consumer崩溃被动离开)

         订阅的topic分区数改变

         订阅的topic个数发生变化(不常见,当使用正则表达式订阅topic时,新增一个符合该正则的topic,则会引发rebalance)

(2)由谁来执行rebalance和分区分配

         由组协调器coordinator来行;当consumer group中的第一个consumer启动时,消费者客户端会向broker确定谁是coordinator,确认方法:

         先确定将消费的offset写入__consumer_offsets的哪个分区,确定方法:partition=(groupId.hashCode %  __consumer_offsets分区数)

         该分区所在的leader所在的broker即为coordinator

(3)rebalance过程

         组内的所有成员向coordinator发送JoinGroup请求,一旦所有的consumer都发送了coordinator请求;coordinator会选择一个consumer作为leader,并且把组成员信息及订阅信息发给leader

         leader负责分配消费方案,即组内消费者消费哪些分区;分配完成后,leader会将分配方案封装到SyncGroup请求,然后发送给coordinator;(除leader外的消费者也会发送SyncGroup请求,只不过请求内容为空),coordinator收到分配方案后,会将分配方案作为              SyncGroup的响应回复给组内所有消费者,这样所有消费者都知道了消费分配方案

5、消费者心跳机制

consumer使用心跳机制向coordinator表明该consumer还在线;consumer在启动后,会定期向coordinator发送心跳请求Heartbeat时间间隔可配置hearbeat.interval.ms默认3秒),如果在指定时间内(session.timeout.ms默认10秒)未发送心跳请求,则coordinator认为该consumer下线,然后触发rebalance,重新分配分区;rebalance期间,其余消费者发送的心跳请求,coordinator会统一响应REBALANCE_IN_PROGRESS,然后这些consumer会重新申请加入组

注:session.timeout.ms要大于hearbeat.interval.ms,一般的配置是session.timeout.ms > hearbeat.interval.ms * 3

6、拉取数据最大时间间隔max.poll.interval.ms(默认5分钟)

如果consumer超过该时间间隔未拉取数据,consumer会主动发起LeaveGroup请求,进而触发rebalance;实际应用中应该注意如下几点

(1)max.poll.interval.ms不宜设置过小

(2)消费者处理消费数据的时间要小于max.poll.interval.ms间隔

(3)如果消费者消费能力不足,可以通过设置max.poll.records从侧面提升消费能力

(4)超过时间间隔未拉取数据可能会导致:消费过的数据未提交,引发重复消费

         

相关推荐