Kafka源码系列之topic创建分区分配及leader选举

一,基本介绍

本文讲解依然是基于kafka源码0.8.2.2。假如阅读过前面的文章应该知道,用户的admin指令都是通过Zookeeper发布给kafka的Controller,然后由Controller发布给具体的Broker。

Topic的创建过程亦是如此。本文主要是关注一下几点:

1,分区和副本是在何处,以怎样的方式分配给Broker。

2,kafka的Controller接收到Zookeeper的通知后做了哪些处理。

3,分区的leader和follower是如何选举的。

二,重要类介绍

1,TopicCommand

Topic相关操作的入口类,职责:创建,修改,更新配置,删除,查看都是经由它来向Zookeeper发布相关策略的。

2,KafkaApis

业务处理线程要使用的对象,其handle方法相当于将各种请求,交由相应的处理函数进行处理。

3,KafkaController

KafkaController作为kafka集群的控制者,有且存在一个leader,若干个follower。Leader能够发送具体的指令给follower,具体指令如:RequestKeys.LeaderAndIsrKey,RequestKeys.StopReplicaKey,RequestKeys.UpdateMetadataKey。

4,PartitionStateMachine

分区的状态机,决定者分区的当前状态及状态转移过程。

NonExistentPartition:不存在。该状态的前状态假如有的话,只能是OfflinePartition

NewPartition:分区创建后的状态,前状态是NonExistentPartition。改状态说明分区已经有副本且不存在leader/isr。

OnlinePartition:选举过leader后,处于该状态,前状态可是:OfflinePartition/NewPartition。

OfflinePartition:选举过leader以后,leader挂掉,分区就会处于当前状态,前状态可能是NewPartition/OnlinePartition

三,源码实现介绍

主要是分三个步骤:

A),command创建时Partition均匀分布于Broker的策略

副本分配有两个目标:

1,尽可能将副本均匀分配到Broker上

2,每个分区的副本都分配到不同的Broker上

为了实现这个目标kafka采取下面两个策略:

1,随机选取一个Broker位置作为分配Partition的起始位置,将Partition的第一个副本进行轮询分配

2,将其它副本以一个递增的位移分配到不同的Broker上去

源码执行的具体过程

TopicCommand.main

if(opts.options.has(opts.createOpt))

createTopic(zkClient, opts)

AdminUtils.createTopic(zkClient, topic, partitions, replicas,configs)

进行partition和Replicas的均匀分配

val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions,replicationFactor)

具体内容是如下:

val ret = new mutable.HashMap[Int, List[Int]]()

//随机选取一个Broker位置作为startIndex

val startIndex = if (fixedStartIndex >= 0) fixedStartIndexelse rand.nextInt(brokerList.size)

//当前分区Id赋值为0

var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0

//随机选取Broker数目范围内的位移

var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)

for (i <- 0 until nPartitions) {

//只有在所有遍历过Broker数目个分区后才将位移加一

if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))

nextReplicaShift += 1

//当前分区id加上起始位置,对Brokersize取余得到第一个副本的位置

val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size

var replicaList = List(brokerList(firstReplicaIndex))

for (j <- 0 until replicationFactor - 1)

//计算出每个副本的位置 计算方法是replicaIndex:

//val shift = 1 + (nextReplicaShift + j) % ( brokerList.size - 1)

//(firstReplicaIndex + shift) % brokerList.size

replicaList ::= brokerList(replicaIndex(firstReplicaIndex,nextReplicaShift, j, brokerList.size))

ret.put(currentPartitionId, replicaList.reverse)

//分区id加一

currentPartitionId = currentPartitionId + 1

}

ret.toMap

将配置和分配策略写到Zookeeper上去

AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)

该方法的具体内容如下:

写配置,Zookeeper的目录是:/config/topics/TopicName

writeTopicConfig(zkClient, topic, config)

写分配策略,Zookeeper的目录是:/brokers/topics/TopicName

writeTopicPartitionAssignment(zkClient, topic,partitionReplicaAssignment, update)

B),kafka Controller监听到topic创建事件后的处理

KafkaController的PartitionStateMachine对象内部有一个Zookeeper的listener专门监听新增topic事件。TopicChangeListener。

获取新增topic

val newTopics = currentChildren --controllerContext.allTopics

获取分区副本分配策略HashMap[TopicAndPartition, Seq[Int]]

val addedPartitionReplicaAssignment =ZkUtils.getReplicaAssignmentForTopics(zkClient,newTopics.toSeq)

进入具体的操作

if(newTopics.size > 0)

//进入具体的操作

controller.onNewTopicCreation(newTopics,addedPartitionReplicaAssignment.keySet.toSet)

订阅新增topic的分区变动事件

// subscribe to partition changes 注册指定topic的分区变动事件监听器

topics.foreach(topic =>partitionStateMachine.registerPartitionChangeListener(topic))

处理新增分区onNewPartitionCreation

该方法主要做两件事:

1,将新建分区的状态转化为NewPartition状态

partitionStateMachine.handleStateChanges(newPartitions,NewPartition)

进入处理函数得到

partitions.foreach { topicAndPartition =>

handleStateChange(topicAndPartition.topic,topicAndPartition.partition, targetState, leaderSelector,callbacks)

}

case NewPartition =>

//指定TopicAndPartition 获取副本

assignReplicasToPartitions(topic, partition)

partitionState.put(topicAndPartition, NewPartition)

val assignedReplicas =controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")

AssgnReplicasToPartition方法的具体内容,主要是先获取分区所在的Brokerid序列,然后

val assignedReplicas = ZkUtils.getReplicasForPartition(controllerContext.zkClient,topic, partition)

controllerContext.partitionReplicaAssignment +=TopicAndPartition(topic, partition) -> assignedReplicas

2,将新建分区的状态从NewPartition到OnlinePartition状态

partitionStateMachine.handleStateChanges(newPartitions,OnlinePartition, offlinePartitionSelector)

在handleStateChange,中具体处理是

case OnlinePartition =>

assertValidPreviousStates(topicAndPartition,List(NewPartition, OnlinePartition, OfflinePartition),OnlinePartition)

partitionState(topicAndPartition) match {

case NewPartition =>

// initialize leader and isr path for new partition

initializeLeaderAndIsrForPartition(topicAndPartition)

在initializeLeaderAndIsrForPartition.第一个seq中的Broker当做leader

val leader = liveAssignedReplicas.head //第一个副本作为leader

val leaderIsrAndControllerEpoch = newLeaderIsrAndControllerEpoch(new LeaderAndIsr(leader,liveAssignedReplicas.toList),

controller.epoch)

更新具体分区的状态信息

[zk: localhost:2181(CONNECTED) 0] get /brokers/topics/innerBashData/partitions/1/state

// {"controller_epoch":6,"leader":6,"version":1,"leader_epoch":24,"isr":[7,6]}

ZkUtils.createPersistentPath(controllerContext.zkClient,

ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),

ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))

topic 分区 副本 放入leaderAndIsrRequestMap,以便我们可以通过Brokerid找到

brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,

topicAndPartition.partition, leaderIsrAndControllerEpoch,replicaAssignment)

发信息给需要的BrokerID

leaderAndIsrRequestMap.foreach { m =>

val broker = m._1

val partitionStateInfos = m._2.toMap

val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet

val leaders =controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id))

val leaderAndIsrRequest = newLeaderAndIsrRequest(partitionStateInfos, leaders,controllerId, controllerEpoch, correlationId, clientId)

for (p <- partitionStateInfos) {

val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader)"become-leader" else "become-follower"

stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " +

"for partition [%s,%d]").format(controllerId, controllerEpoch,typeOfRequest,

p._2.leaderIsrAndControllerEpoch, correlationId, broker,

p._1._1,p._1._2))

}

// 给具体的Broker发送LeaderAndIsrRequest

controller.sendRequest(broker, leaderAndIsrRequest,null)

}

C),Broker leader和follower的产生过程

在Broker接收到Controller的LeaderAndIsrRequest消息后,交由kafkaApis的handle处理

case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)

当前Broker成为副本的leader或者follower的入口函数

val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager)

当前Broker能不能成为Broker,取决于Brokerid是否与leader分配的Brokerid一致,一致就会成为leader,否则follower

val partitionsTobeLeader = partitionState

.filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}

val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)

真正的进入leader或者follower的过程

if (!partitionsTobeLeader.isEmpty)

makeLeaders(controllerId, controllerEpoch,partitionsTobeLeader, leaderAndISRRequest.correlationId,responseMap, offsetManager)

if (!partitionsToBeFollower.isEmpty)

makeFollowers(controllerId, controllerEpoch,partitionsToBeFollower, leaderAndISRRequest.leaders,leaderAndISRRequest.correlationId, responseMap,offsetManager)

在接收到第一个leaderisrrequest后初始化highwatermark 线程。这可以保证所有的分区都被填充,通过避免恶性竞争启动Checkpointing之前。

if (!hwThreadInitialized) {

startHighWaterMarksCheckPointThread()

hwThreadInitialized = true

}

下面具体讲解makeleaders和makeFollowers方法

使当前Broker成为给定分区的leader ,需要做以下几个处理:

* 1,停止掉这些分区的fetchers

* 2,更新缓存的当前分区的元数据

* 3,将分区加入leader 分区集合

// First stop fetchers for all the partitions

replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))

// Update the partition information to be the leader

partitionState.foreach{ case (partition, partitionStateInfo) =>

partition.makeLeader(controllerId, partitionStateInfo,correlationId, offsetManager)}

Makeleader方法具体的操作了一个副本成为leader的过程:

主要做了以下几件事情:

* 记录LeaderShip 决议的时代。在更新isr并维护Zookeeperpath的中的Controller时代

* 增加新的副本

* 移除已经被Controller移除的已分配副本

* 为新的leader副本构建高水位元数据

* 为远程副本重置logendoffset

* 由于isr可能将为1,我们需要增加高水位

具体源码如下:

def makeLeader(controllerId: Int,

partitionStateInfo: PartitionStateInfo,correlationId: Int,

offsetManager: OffsetManager): Boolean = {

inWriteLock(leaderIsrUpdateLock) {

val allReplicas = partitionStateInfo.allReplicas

val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch

val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr

// record the epoch of the controller that made the leadership decision. This is useful while updating the isr

// to maintain the decision maker controller's epoch in the zookeeper path

controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch

// add replicas that are new

allReplicas.foreach(replica => getOrCreateReplica(replica))

val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet

// remove assigned replicas that have been removed by the controller

(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))

inSyncReplicas = newInSyncReplicas

leaderEpoch = leaderAndIsr.leaderEpoch

zkVersion = leaderAndIsr.zkVersion

leaderReplicaIdOpt = Some(localBrokerId)

// construct the high watermark metadata for the new leader replica

val newLeaderReplica = getReplica().get

newLeaderReplica.convertHWToLocalOffsetMetadata()

// reset log end offset for remote replicas

assignedReplicas.foreach(r => if (r.brokerId !=localBrokerId) r.logEndOffset = LogOffsetMetadata.UnknownOffsetMetadata)

// we may need to increment high watermark since ISR could be down to 1

maybeIncrementLeaderHW(newLeaderReplica)

if (topic == OffsetManager.OffsetsTopicName)

offsetManager.loadOffsetsFromLog(partitionId)

true

}

}

当前Broker成为给定分区的follower要做要做以下几个处理:

* 1,将分区从leader partition 集合中移除

* 2,将副本标记为follower ,目的是不让生产者继续往该副本生产消息

* 3,停止掉该分区的所有fetcher,目的是不让副本fetcher线程往该副本写数据。

* 4,清空当前分区的log和Checkpoint offsets

* 5,假如Broker没有挂掉,增加从新leader获取数据的副本fetcher线程

具体代码如下:

将分区从leader partition 集合中移除

将副本标记为follower ,目的是不让生产者继续往该副本生产消息

partitionState.foreach{ case (partition, partitionStateInfo) =>

val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch

val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader

leaders.find(_.id == newLeaderBrokerId) match {

// Only change partition state when the leader is available

case Some(leaderBroker) =>

if (partition.makeFollower(controllerId,partitionStateInfo, correlationId, offsetManager))

partitionsToMakeFollower += partition

当前分区的log和Checkpoint offsets

replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))

清空当前分区的log和Checkpoint offsets

logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition),partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)

假如Broker没有挂掉,增加从新leader获取数据的副本fetcher线程

val partitionsToMakeFollowerWithLeaderAndOffset =partitionsToMakeFollower.map(partition =>

new TopicAndPartition(partition) -> BrokerAndInitialOffset(

leaders.find(_.id == partition.leaderReplicaIdOpt.get).get,

partition.getReplica().get.logEndOffset.messageOffset)).toMap

replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

具体的makeFollower方法中

通过设置leader和ISR为空,使本地副本成为Follower

主要做了以下几件事情:

* 记录LeaderShip 决议的时代。在更新isr并维护Zookeeperpath的中的Controller时代

* 增加新的副本

* 移除已经被Controller移除的已分配副本

val allReplicas = partitionStateInfo.allReplicas

val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch

val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr

val newLeaderBrokerId: Int = leaderAndIsr.leader

// record the epoch of the controller that made the leadership decision. This is useful while updating the isr

// to maintain the decision maker controller's epoch in the zookeeper path

controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch

// add replicas that are new

allReplicas.foreach(r => getOrCreateReplica(r))

// remove assigned replicas that have been removed by the controller

(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))

inSyncReplicas = Set.empty[Replica]

leaderEpoch = leaderAndIsr.leaderEpoch

zkVersion = leaderAndIsr.zkVersion

leaderReplicaIdOpt.foreach { leaderReplica =>

if (topic == OffsetManager.OffsetsTopicName &&

/* if we are making a leader->follower transition */

leaderReplica == localBrokerId)

offsetManager.clearOffsetsInPartition(partitionId)

}

if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {

false

}

else {

leaderReplicaIdOpt = Some(newLeaderBrokerId)

true

}

四,总结

本文主要是以topic的创建过程,讲解分区及副本在集群Broker上的分布的实现,顺便讲解新建topic的话分区的leader的选举方法,及我们的副本成为leader和Follower的要素。

这个过程实际上也是基于Zookeeper实现了订阅发布系统,发布者是TopicCommand类,订阅者是kafka的Controller类。再由kafka的Controller进行分区leader选举(副本列表第一个),然后给TopicCommand已经指定的各个Broker Follower发送LeaderAndIsrRequest,由根据我们TopicCommand中分区的分配的具体Broker去启动副本为leader(leader的被分配的Brokerid和当前Broker的id相等)或者Follower。

Kafka源码系列之topic创建分区分配及leader选举
Kafka源码系列之topic创建分区分配及leader选举
Kafka源码系列之topic创建分区分配及leader选举

假如,你对kafka,hbase,spark,hadoop教程感兴趣,

请关注菲儿,转发文章,私信(资料)就可获取全套大数据教程。

相关推荐