kafka是如何做到百万级高并发低迟延的?
Kafka是高吞吐低延迟的高并发、高性能的消息中间件,在大数据领域有极为广泛的运用。配置良好的Kafka集群甚至可以做到每秒几十万、上百万的超高并发写入。Kafka到底是如何做到这么高的吞吐量和性能的呢?我们今天来走进kafka的server端探究一下它的Reactor高并发网络模型机制。
1.1、Kafka Reactor模型架构
Kafka客户端和服务端通信采取的是NIO的reactor模式,它是一种事件驱动模式。那么一个常见的单线程Reactor模式下,NIO线程的职责都有哪些呢?我们整理了如下几点:
1、作为NIO服务端,接收客户端的TCP连接
2、作为NIO客户端,向服务端发起TCP连接
3、读取通信对端的请求或者应答消息
4、向通信对端发送消息请求或者应答消息
以上四点对应的一个Reactor模式的架构图如下:
对于一些小容量的业务场景,这种单线程的模式基本够用。但是对于高负载、大并发的应用场景却并不适合,主要原因如下:
性能问题1:一个NIO线程同时处理数十万甚至百万级的链路性能是无法支撑的
性能问题2:如果超时发生重试会加重服务端处理负荷,导致大量处理积压
可靠性问题:单个线程出现故障,整个系统无法使用,造成单点故障
所以一个高并发的处理服务需要对以上架构进行优化改造,例如处理采取多线程模式,将接收线程尽量简化,相当于将接收线程作为一个接入层。那么我们回到主题kafka的reactor模式架构是怎样的?
在上面这个kafka的架构图中可以看出,它包含以下几个流程:
1、客户端请求NIO的连接器Acceptor,同时它还具备事件的转发功能,转发到Processor处理
2、服务端网络事件处理器Processor
3、请求队列RequestChannel,存储了所有待处理的请求信息
4、请求处理线程池(RequestHandler Pool)作为守护线程轮训RequestChannel的请求处理信息,并将其转发给API层对应的处理器处理
5、API层处理器将请求处理完成之后放入到Response Queue中,并由Processor从Response Queue取出发送到对应的Client端
需要注意的一点是虽然Broker层包含多个Acceptor,但是kafka的reactor模式里面还是单线程Acceptor多线程handler的模式,这里的多个Acceptor是针对一个服务器下多网卡场景的,每个EndPoint就是一个网卡它对应于一个ip和port的组合,而每个Endpoint只有一个Acceptor。
1.2、Kafka Reactor模型源码详解
按照上面架构图阐述的几个流程,它分别对应着kafka里面的事件接收、处理、响应等几个阶段,我们下面从具体实现这几个阶段的源码层面来分析。
1.2.1、SocketServer
SocketServer是一个标准的NIO服务端实现,它主要包含以下变量:
1、RequestChannel:Processor和KafkaRequestHandler 之间数据交换的队列
2、Processors:processor的容器,存放的是processor的id和processor对象的映射
3、Acceptors:acceptor的容器,存放的是EndPoint和acceptor的映射
4、ConnectionQuotas:链接限制器,针对每个IP的链接数进行限制
SocketServer的启动流程如下:
部分源码如下,启动入口:
def startup(startupProcessors: Boolean = true) { this.synchronized { connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) createAcceptorAndProcessors(config.numNetworkThreads, config.listeners) if (startupProcessors) { startProcessors() } }
创建Acceptor及Proccessor实现逻辑:
private def startProcessors(processors: Seq[Processor]): Unit = synchronized { processors.foreach { processor => KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", processor).start() } }
1.2.2、Acceptor
Acceptor是NIO里面的一个轻量级接入服务,它主要包含如下变量:
1、nioSelector:Java的NIO网络选择器
2、serverChannel:ip和端口绑定到socket
3、Processors:processor的容器,存放的是processor对象
它的主要处理流程如下:
1、将nioSelector注册为OP_ACCEPT
2、轮训从nioSelector读取事件
3、通过RR的模式选择processor
4、接收一个新的链接设置(从serverSocketChannel获取socketChannel,并对它的属性进行设置)
5、移交processor的accept处理
重要逻辑代码如下:
def run() { serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) startupComplete() try { var currentProcessor = 0 while (isRunning) { try { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) { val processor = synchronized { //通过RR选择Processor currentProcessor = currentProcessor % processors.size processors(currentProcessor) } accept(key, processor) } else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread, mod(numProcessors) will be done later currentProcessor = currentProcessor + 1 } catch { case e: Throwable => error("Error while accepting connection", e) } } } }
socketChannel的链接设置逻辑:
def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() try { connectionQuotas.inc(socketChannel.socket().getInetAddress) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socketChannel.socket().setSendBufferSize(sendBufferSize) processor.accept(socketChannel) }
1.2.3、Processor
Processor的主要职责是将来自客户端的网络链接请求封装成RequestContext并发送给RequestChannel,同时需要对handler处理完的响应回执发送给客户端。它主要包括:
1、newConnections:是一个线程安全的队列,存放从acceptor接收到的网络新链接
2、inflightResponses:已发送客户端的响应,存放了和客户端的链接id(由本地ip、port以及远端ip、port还有额外一个序列值组成)和响应对象的映射
3、responseQueue:是一个阻塞队列,存放handler的响应请求
我们在前面使用的kafka reactor模型架构图上改造一下,就得到如下proccessor的核心逻辑架构:
它的核心逻辑如下几个步骤:
1、proccessor线程从newConnections中轮询获取socketChannel,并将selector监听事件修改为OP_READ;
2、processNewResponses处理新的响应需求,其中类型为SendAction的就是向客户端发送响应,并将发送的响应记录在inflightResponses ,它的核心逻辑是sendResponse如下:
protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) { val connectionId = response.request.context.connectionId if (channel(connectionId).isEmpty) { response.request.updateRequestMetrics(0L, response) } if (openOrClosingChannel(connectionId).isDefined) { selector.send(responseSend) inflightResponses += (connectionId -> response) } }
3、Selector调用poll从客户端获取到的请求信息,并将获取到的NetworkReceive添加到completedReceives缓存中。
4、而processCompletedReceives负责处理completedReceives中的接收信息,最后封装为RequestChannel.Request,再调用requestChannel将请求添加到发送队列(即requestQueue)当中,源码逻辑如下所示:
private def processCompletedReceives() { selector.completedReceives.asScala.foreach { receive => try { openOrClosingChannel(receive.source) match { case Some(channel) => val header = RequestHeader.parse(receive.payload) val context = new RequestContext(header, receive.source, channel.socketAddress, channel.principal, listenerName, securityProtocol) val req = new RequestChannel.Request(processor = id, context = context, startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics) requestChannel.sendRequest(req) selector.mute(receive.source) case None => throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive") } } catch { case e: Throwable => processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e) } } }
1.2.4、RequestChannel
requestChannel承载了kafka请求和响应的所有转发,它包含有如下两个变量:
1、requestQueue:是一个加锁阻塞队列,RequestChannel传输请求和响应信息的重要组件,上面讲到的RequestChannel.Request就是被放入到这个队列中
2、Processors:存储了processorid和processor的映射关系,主要是在response发送的时候从中选择对应的processor
它的两个核心功能是添加请求和发送响应回执,源码逻辑分别如下:
def sendRequest(request: RequestChannel.Request) { requestQueue.put(request) }
发送响应回执和之前processor略有不同,这里只是将response再添加到responseQueue中,之后由processor轮训从里面取出回执发送到客户端。
def sendResponse(response: RequestChannel.Response) { //省略log trace val processor = processors.get(response.processor) if (processor != null) { processor.enqueueResponse(response) } }
1.2.5、KafkaRequestHandler
说到KafkaRequestHandler ,首先要往回聊一聊,看看它是如何产生的。它被KafkaRequestHandlerPool所创建,而pool是在kafkaServer启动的时候创建的,源码如下:
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) for (i <- 0 until numThreads) { createHandler(i) } def createHandler(id: Int): Unit = synchronized { runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time) KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start() }
好了,讲解完KafkaRequestHandler 的创建过程,接下来就是它的处理逻辑了,它的逻辑很简单,流程如下几个步骤:
1、从requestChannel拉取请求
2、判断请求类型,如果是Request类型则调用KafkaApis处理相应的请求
1.3、改进和优化
至此,我们已经将kafka的reactor模型分析完,最后提一个发散性问题,基于kafka实现的这个reactor模型以及源码的分析实现,如果让你来设计,你觉得还有哪些是可能存在性能瓶颈的地方可以做进一步优化的,大家可以在下面留言发表你的看法,下期会把我对这个问题的一些思考分享出来。