Kafka源码系列之topic创建分区分配及leader选举
一,基本介绍
本文讲解依然是基于kafka源码0.8.2.2。假如阅读过前面的文章应该知道,用户的admin指令都是通过Zookeeper发布给kafka的Controller,然后由Controller发布给具体的Broker。
Topic的创建过程亦是如此。本文主要是关注一下几点:
1,分区和副本是在何处,以怎样的方式分配给Broker。
2,kafka的Controller接收到Zookeeper的通知后做了哪些处理。
3,分区的leader和follower是如何选举的。
二,重要类介绍
1,TopicCommand
Topic相关操作的入口类,职责:创建,修改,更新配置,删除,查看都是经由它来向Zookeeper发布相关策略的。
2,KafkaApis
业务处理线程要使用的对象,其handle方法相当于将各种请求,交由相应的处理函数进行处理。
3,KafkaController
KafkaController作为kafka集群的控制者,有且存在一个leader,若干个follower。Leader能够发送具体的指令给follower,具体指令如:RequestKeys.LeaderAndIsrKey,RequestKeys.StopReplicaKey,RequestKeys.UpdateMetadataKey。
4,PartitionStateMachine
分区的状态机,决定者分区的当前状态及状态转移过程。
NonExistentPartition:不存在。该状态的前状态假如有的话,只能是OfflinePartition
NewPartition:分区创建后的状态,前状态是NonExistentPartition。改状态说明分区已经有副本且不存在leader/isr。
OnlinePartition:选举过leader后,处于该状态,前状态可是:OfflinePartition/NewPartition。
OfflinePartition:选举过leader以后,leader挂掉,分区就会处于当前状态,前状态可能是NewPartition/OnlinePartition
三,源码实现介绍
主要是分三个步骤:
A),command创建时Partition均匀分布于Broker的策略
副本分配有两个目标:
1,尽可能将副本均匀分配到Broker上
2,每个分区的副本都分配到不同的Broker上
为了实现这个目标kafka采取下面两个策略:
1,随机选取一个Broker位置作为分配Partition的起始位置,将Partition的第一个副本进行轮询分配
2,将其它副本以一个递增的位移分配到不同的Broker上去
源码执行的具体过程
TopicCommand.main
if(opts.options.has(opts.createOpt))
createTopic(zkClient, opts)
AdminUtils.createTopic(zkClient, topic, partitions, replicas,configs)
进行partition和Replicas的均匀分配
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions,replicationFactor)
具体内容是如下:
val ret = new mutable.HashMap[Int, List[Int]]()
//随机选取一个Broker位置作为startIndex
val startIndex = if (fixedStartIndex >= 0) fixedStartIndexelse rand.nextInt(brokerList.size)
//当前分区Id赋值为0
var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0
//随机选取Broker数目范围内的位移
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
for (i <- 0 until nPartitions) {
//只有在所有遍历过Broker数目个分区后才将位移加一
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
nextReplicaShift += 1
//当前分区id加上起始位置,对Brokersize取余得到第一个副本的位置
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
var replicaList = List(brokerList(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
//计算出每个副本的位置 计算方法是replicaIndex:
//val shift = 1 + (nextReplicaShift + j) % ( brokerList.size - 1)
//(firstReplicaIndex + shift) % brokerList.size
replicaList ::= brokerList(replicaIndex(firstReplicaIndex,nextReplicaShift, j, brokerList.size))
ret.put(currentPartitionId, replicaList.reverse)
//分区id加一
currentPartitionId = currentPartitionId + 1
}
ret.toMap
将配置和分配策略写到Zookeeper上去
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)
该方法的具体内容如下:
写配置,Zookeeper的目录是:/config/topics/TopicName
writeTopicConfig(zkClient, topic, config)
写分配策略,Zookeeper的目录是:/brokers/topics/TopicName
writeTopicPartitionAssignment(zkClient, topic,partitionReplicaAssignment, update)
B),kafka Controller监听到topic创建事件后的处理
KafkaController的PartitionStateMachine对象内部有一个Zookeeper的listener专门监听新增topic事件。TopicChangeListener。
获取新增topic
val newTopics = currentChildren --controllerContext.allTopics
获取分区副本分配策略HashMap[TopicAndPartition, Seq[Int]]
val addedPartitionReplicaAssignment =ZkUtils.getReplicaAssignmentForTopics(zkClient,newTopics.toSeq)
进入具体的操作
if(newTopics.size > 0)
//进入具体的操作
controller.onNewTopicCreation(newTopics,addedPartitionReplicaAssignment.keySet.toSet)
订阅新增topic的分区变动事件
// subscribe to partition changes 注册指定topic的分区变动事件监听器
topics.foreach(topic =>partitionStateMachine.registerPartitionChangeListener(topic))
处理新增分区onNewPartitionCreation
该方法主要做两件事:
1,将新建分区的状态转化为NewPartition状态
partitionStateMachine.handleStateChanges(newPartitions,NewPartition)
进入处理函数得到
partitions.foreach { topicAndPartition =>
handleStateChange(topicAndPartition.topic,topicAndPartition.partition, targetState, leaderSelector,callbacks)
}
case NewPartition =>
//指定TopicAndPartition 获取副本
assignReplicasToPartitions(topic, partition)
partitionState.put(topicAndPartition, NewPartition)
val assignedReplicas =controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
AssgnReplicasToPartition方法的具体内容,主要是先获取分区所在的Brokerid序列,然后
val assignedReplicas = ZkUtils.getReplicasForPartition(controllerContext.zkClient,topic, partition)
controllerContext.partitionReplicaAssignment +=TopicAndPartition(topic, partition) -> assignedReplicas
2,将新建分区的状态从NewPartition到OnlinePartition状态
partitionStateMachine.handleStateChanges(newPartitions,OnlinePartition, offlinePartitionSelector)
在handleStateChange,中具体处理是
case OnlinePartition =>
assertValidPreviousStates(topicAndPartition,List(NewPartition, OnlinePartition, OfflinePartition),OnlinePartition)
partitionState(topicAndPartition) match {
case NewPartition =>
// initialize leader and isr path for new partition
initializeLeaderAndIsrForPartition(topicAndPartition)
在initializeLeaderAndIsrForPartition.第一个seq中的Broker当做leader
val leader = liveAssignedReplicas.head //第一个副本作为leader
val leaderIsrAndControllerEpoch = newLeaderIsrAndControllerEpoch(new LeaderAndIsr(leader,liveAssignedReplicas.toList),
controller.epoch)
更新具体分区的状态信息
[zk: localhost:2181(CONNECTED) 0] get /brokers/topics/innerBashData/partitions/1/state
// {"controller_epoch":6,"leader":6,"version":1,"leader_epoch":24,"isr":[7,6]}
ZkUtils.createPersistentPath(controllerContext.zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
topic 分区 副本 放入leaderAndIsrRequestMap,以便我们可以通过Brokerid找到
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
topicAndPartition.partition, leaderIsrAndControllerEpoch,replicaAssignment)
发信息给需要的BrokerID
leaderAndIsrRequestMap.foreach { m =>
val broker = m._1
val partitionStateInfos = m._2.toMap
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
val leaders =controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id))
val leaderAndIsrRequest = newLeaderAndIsrRequest(partitionStateInfos, leaders,controllerId, controllerEpoch, correlationId, clientId)
for (p <- partitionStateInfos) {
val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader)"become-leader" else "become-follower"
stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " +
"for partition [%s,%d]").format(controllerId, controllerEpoch,typeOfRequest,
p._2.leaderIsrAndControllerEpoch, correlationId, broker,
p._1._1,p._1._2))
}
// 给具体的Broker发送LeaderAndIsrRequest
controller.sendRequest(broker, leaderAndIsrRequest,null)
}
C),Broker leader和follower的产生过程
在Broker接收到Controller的LeaderAndIsrRequest消息后,交由kafkaApis的handle处理
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
当前Broker成为副本的leader或者follower的入口函数
val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager)
当前Broker能不能成为Broker,取决于Brokerid是否与leader分配的Brokerid一致,一致就会成为leader,否则follower
val partitionsTobeLeader = partitionState
.filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
真正的进入leader或者follower的过程
if (!partitionsTobeLeader.isEmpty)
makeLeaders(controllerId, controllerEpoch,partitionsTobeLeader, leaderAndISRRequest.correlationId,responseMap, offsetManager)
if (!partitionsToBeFollower.isEmpty)
makeFollowers(controllerId, controllerEpoch,partitionsToBeFollower, leaderAndISRRequest.leaders,leaderAndISRRequest.correlationId, responseMap,offsetManager)
在接收到第一个leaderisrrequest后初始化highwatermark 线程。这可以保证所有的分区都被填充,通过避免恶性竞争启动Checkpointing之前。
if (!hwThreadInitialized) {
startHighWaterMarksCheckPointThread()
hwThreadInitialized = true
}
下面具体讲解makeleaders和makeFollowers方法
使当前Broker成为给定分区的leader ,需要做以下几个处理:
* 1,停止掉这些分区的fetchers
* 2,更新缓存的当前分区的元数据
* 3,将分区加入leader 分区集合
// First stop fetchers for all the partitions
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
// Update the partition information to be the leader
partitionState.foreach{ case (partition, partitionStateInfo) =>
partition.makeLeader(controllerId, partitionStateInfo,correlationId, offsetManager)}
Makeleader方法具体的操作了一个副本成为leader的过程:
主要做了以下几件事情:
* 记录LeaderShip 决议的时代。在更新isr并维护Zookeeperpath的中的Controller时代
* 增加新的副本
* 移除已经被Controller移除的已分配副本
* 为新的leader副本构建高水位元数据
* 为远程副本重置logendoffset
* 由于isr可能将为1,我们需要增加高水位
具体源码如下:
def makeLeader(controllerId: Int,
partitionStateInfo: PartitionStateInfo,correlationId: Int,
offsetManager: OffsetManager): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
// add replicas that are new
allReplicas.foreach(replica => getOrCreateReplica(replica))
val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
// remove assigned replicas that have been removed by the controller
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
inSyncReplicas = newInSyncReplicas
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt = Some(localBrokerId)
// construct the high watermark metadata for the new leader replica
val newLeaderReplica = getReplica().get
newLeaderReplica.convertHWToLocalOffsetMetadata()
// reset log end offset for remote replicas
assignedReplicas.foreach(r => if (r.brokerId !=localBrokerId) r.logEndOffset = LogOffsetMetadata.UnknownOffsetMetadata)
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(newLeaderReplica)
if (topic == OffsetManager.OffsetsTopicName)
offsetManager.loadOffsetsFromLog(partitionId)
true
}
}
当前Broker成为给定分区的follower要做要做以下几个处理:
* 1,将分区从leader partition 集合中移除
* 2,将副本标记为follower ,目的是不让生产者继续往该副本生产消息
* 3,停止掉该分区的所有fetcher,目的是不让副本fetcher线程往该副本写数据。
* 4,清空当前分区的log和Checkpoint offsets
* 5,假如Broker没有挂掉,增加从新leader获取数据的副本fetcher线程
具体代码如下:
将分区从leader partition 集合中移除
将副本标记为follower ,目的是不让生产者继续往该副本生产消息
partitionState.foreach{ case (partition, partitionStateInfo) =>
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
leaders.find(_.id == newLeaderBrokerId) match {
// Only change partition state when the leader is available
case Some(leaderBroker) =>
if (partition.makeFollower(controllerId,partitionStateInfo, correlationId, offsetManager))
partitionsToMakeFollower += partition
当前分区的log和Checkpoint offsets
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))
清空当前分区的log和Checkpoint offsets
logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition),partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
假如Broker没有挂掉,增加从新leader获取数据的副本fetcher线程
val partitionsToMakeFollowerWithLeaderAndOffset =partitionsToMakeFollower.map(partition =>
new TopicAndPartition(partition) -> BrokerAndInitialOffset(
leaders.find(_.id == partition.leaderReplicaIdOpt.get).get,
partition.getReplica().get.logEndOffset.messageOffset)).toMap
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
具体的makeFollower方法中
通过设置leader和ISR为空,使本地副本成为Follower
主要做了以下几件事情:
* 记录LeaderShip 决议的时代。在更新isr并维护Zookeeperpath的中的Controller时代
* 增加新的副本
* 移除已经被Controller移除的已分配副本
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val newLeaderBrokerId: Int = leaderAndIsr.leader
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
// add replicas that are new
allReplicas.foreach(r => getOrCreateReplica(r))
// remove assigned replicas that have been removed by the controller
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
inSyncReplicas = Set.empty[Replica]
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt.foreach { leaderReplica =>
if (topic == OffsetManager.OffsetsTopicName &&
/* if we are making a leader->follower transition */
leaderReplica == localBrokerId)
offsetManager.clearOffsetsInPartition(partitionId)
}
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
false
}
else {
leaderReplicaIdOpt = Some(newLeaderBrokerId)
true
}
四,总结
本文主要是以topic的创建过程,讲解分区及副本在集群Broker上的分布的实现,顺便讲解新建topic的话分区的leader的选举方法,及我们的副本成为leader和Follower的要素。
这个过程实际上也是基于Zookeeper实现了订阅发布系统,发布者是TopicCommand类,订阅者是kafka的Controller类。再由kafka的Controller进行分区leader选举(副本列表第一个),然后给TopicCommand已经指定的各个Broker Follower发送LeaderAndIsrRequest,由根据我们TopicCommand中分区的分配的具体Broker去启动副本为leader(leader的被分配的Brokerid和当前Broker的id相等)或者Follower。
假如,你对kafka,hbase,spark,hadoop教程感兴趣,
请关注菲儿,转发文章,私信(资料)就可获取全套大数据教程。