Kafka 0.9 Coordinator的负载均衡实现
最近在研究kafka,本着先理清框架脉络,再看细节实现的想法,先抱着文档一阵猛看,本来以为Coordinator和Controller的流程基本一样,选举一个Coordinator为主来接收Consumer的分配。哪知后来看了下源码,坑爹呢,选举去哪了:
KafkaServer.scala
/* start kafka coordinator */ consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager) consumerCoordinator.startup()
GroupCoordinator.scala
/** * Startup logic executed at the same time when the server starts up. */ def startup() { info("Starting up.") heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId) joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId) isActive.set(true) info("Startup complete.") }
服务端启动时Coordinator只启动了两个线程,一个处理心跳检测,一个处理Consumer加入,百思不得其解,然后给Guozhang Wang(Kafka开发人员之一)发了封邮件请教,才理清了来龙去脉,因此记录一下相关代码流程。
Coordinator是kafka负责consumer负载均衡,也就是你所订阅的Topic的Partition由哪个consumer消费的分配事项。具体介绍请参考以下篇文章:
http://www.infoq.com/cn/articles/kafka-analysis-part-4
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
上图参考:http://blog.daich.org/2016/02/15/kafka-consumer-0.9/
如本文开头代码所示,随着Kafka服务端的启动,Coordinator也随之启动,但是并没有Coordinator leader的选举过程,因为对于服务端来说,每一个服务端都有一个Coordinator,它们不区分leader/follower而同时工作,各自管理一部分Consumer group。这样一来,Coordinator的负载均衡也就涉及到了两个方面,一方面是Coordinator自已,哪个Coordinator负责哪个group(上图第3步实现),一方面是Consumer,哪个Partition分配给同一group的中哪个consumer(上图中第5步实现)。
具体来说,Coordinator方面,由Consumer根据之前获得的Topic的Metadata信息,向服务端发起GroupCoordinatorRequest请求,服务端收到此请求后在KafkaApi.scala中进行处理:
def handleGroupCoordinatorRequest(request: RequestChannel.Request) { ... ... val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId) // get metadata (and create the topic if necessary) val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), request.securityProtocol).head val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).flatMap { partitionMetadata => partitionMetadata.leader } val responseBody = coordinatorEndpoint match { case None => new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode()) case Some(endpoint) => new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port)) } ... ... } }关键点在coordinator.partitionFor(groupCoordinatorRequest.groupId),这个方法最终调用GroupMetadataManager.scala中的: