metaq源码解读之FetchManager
FetchManager:请求管理器接口。
既然是管理器,就需要知道管理的对象是什么?FetchRequest——管理的是一次次的请求。
既然是管理器,就需要给被管理者提供容所?FetchRequestQueue——请求delay queue。
既然是管理器,就需要有管理实施者?FetchRequestRunner——从请求队列中提取请求,然后处理。
既然是管理器,就需要知道管理了哪些事情?
① 启动请求处理器
② 停止请求处理器
③ 重置亦即初始化管理器
④ 添加请求
⑤ 获取请求总数
⑥ 标记处理器状态
管理器如何管理的?
①处理器 FetchRequestRunner
while (!this.stopped) {//只要处理器没有被终止,则不停的从请求队列中提取请求并处理 try { final FetchRequest request = SimpleFetchManager.this.requestQueue.take(); this.processRequest(request); } catch (final InterruptedException e) { // take响应中断,忽略 } }
void processRequest(final FetchRequest request) { try { final MessageIterator iterator = SimpleFetchManager.this.consumer.fetch(request, -1, null); final MessageListener listener = SimpleFetchManager.this.consumer.getMessageListener(request.getTopic()); final ConsumerMessageFilter filter = SimpleFetchManager.this.consumer.getMessageFilter(request.getTopic()); this.notifyListener(request, iterator, listener, filter, SimpleFetchManager.this.consumer.getConsumerConfig().getGroup()); } }
②请求队列 FetchRequestQueue
//队列中定义的线程用来等待队列的头元素。这种leader-follower设计模式的变种有助于使等待的时间最小化。当一个线程变为leader,它只需等待下一个delay的时间,而其他线程将无限期等待。 //leader线程在从take()或者poll()等等方法返回之前必须signal其他线程,除非这期间其他线程变成了leader线程。 //每当队列的头结点被更早到期时间的节点替代,leader失效被重新设置为null,其他的线程——不一定是当前的leader,将被signal。 //因此等待线程要时刻准备着获取leader或者丧失leader private Thread leader = null; public void offer(FetchRequest e) { final Lock lock = this.lock; lock.lock(); try { //如果已经关联了队列且不是关联本队列的请求,不予添加 if (e.getRefQueue() != null && e.getRefQueue() != this) { return; } // 请求关联本队列 e.setRefQueue(this); //入队、排序 this.queue.offer(e); Collections.sort(this.queue); // Leader is changed. if (this.queue.peek() == e) { this.leader = null; this.available.signal(); } }finally { lock.unlock(); } } public FetchRequest take() throws InterruptedException { final Lock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { FetchRequest first = this.queue.peek(); if (first == null) {//队列没有请求,等待 this.available.await(); } else { //当队列中存在延迟已到元素,则当前线程进行处理。只有当所有元素都延迟未到,才需要考虑leader问题 long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) {//请求延迟时间已到,出队接受处理 return this.queue.poll(); } else if (this.leader != null) {//当前线程为fllower,等待成为leader,其他线程已变为leader this.available.await(); } else { //当前线程设置为leader Thread thisThread = Thread.currentThread(); this.leader = thisThread; try { //leader线程等待剩余延迟时间 this.available.awaitNanos(delay); } finally { //延迟过后如果当前线程还是leader,则leader失效。重新争夺leader if (this.leader == thisThread) { this.leader = null; } } } } } } finally { //其他线程没有成为leader,并且队列不空,则唤醒一个等待者 if (this.leader == null && this.queue.peek() != null) { this.available.signal(); } lock.unlock(); } }