kafka KafkaRequestHandlerPool类
KafkaRequestHandlerPool是KafkaRequestHandler的handler池,处理所有请求队列
具体的处理,会交由KafkaApis类
for(i <- 0 until numThreads) { runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis) threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i)) threads(i).start() }
run方法:
def run() { while(true) { try { var req : RequestChannel.Request = null while (req == null) { //获得请求 // We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and // time_window is independent of the number of threads, each recorded idle // time should be discounted by # threads. val startSelectTime = SystemTime.nanoseconds req = requestChannel.receiveRequest(300)//blockqueue poll 获得请求request val idleTime = SystemTime.nanoseconds - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads) } //处理 if(req eq RequestChannel.AllDone) { debug("Kafka request handler %d on broker %d received shut down command".format( id, brokerId)) return } req.requestDequeueTimeMs = SystemTime.milliseconds trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req)) //交由apis类负责处理request apis.handle(req) } catch { case e: Throwable => error("Exception when handling request", e) } } }
相关推荐
Kafka 2020-09-18
yanghuashuiyue 2020-11-14
liuxingen 2020-11-13
wangying 2020-11-13
王谦 2020-11-03
huangwei00 2020-10-14
shenzhenzsw 2020-10-09
guicaizhou 2020-09-30
jiaomrswang 2020-09-23
jyj0 2020-09-21
guicaizhou 2020-09-15
hannuotayouxi 2020-08-20
amwayy 2020-08-03
yangyutong00 2020-08-01
weikaixxxxxx 2020-08-01
PoppyEvan 2020-08-01
guicaizhou 2020-08-01
PoppyEvan 2020-07-29