metaq源码解读之FetchManager

 FetchManager:请求管理器接口。

        既然是管理器,就需要知道管理的对象是什么?FetchRequest——管理的是一次次的请求。

        既然是管理器,就需要给被管理者提供容所?FetchRequestQueue——请求delay queue。

        既然是管理器,就需要有管理实施者?FetchRequestRunner——从请求队列中提取请求,然后处理。

        既然是管理器,就需要知道管理了哪些事情?

               ① 启动请求处理器

               ② 停止请求处理器

               ③ 重置亦即初始化管理器

               ④ 添加请求

               ⑤ 获取请求总数

               ⑥ 标记处理器状态

        管理器如何管理的?

               ①处理器 FetchRequestRunner

while (!this.stopped) {//只要处理器没有被终止,则不停的从请求队列中提取请求并处理
    try {
          final FetchRequest request = SimpleFetchManager.this.requestQueue.take();
          this.processRequest(request);
    } catch (final InterruptedException e) {
          // take响应中断,忽略
     }
}
void processRequest(final FetchRequest request) {
     try {
           final MessageIterator iterator = SimpleFetchManager.this.consumer.fetch(request, -1, null);
           final MessageListener listener =
SimpleFetchManager.this.consumer.getMessageListener(request.getTopic());
           final ConsumerMessageFilter filter =
SimpleFetchManager.this.consumer.getMessageFilter(request.getTopic());
           this.notifyListener(request, iterator, listener, filter, SimpleFetchManager.this.consumer.getConsumerConfig().getGroup());
     }
}

                 ②请求队列 FetchRequestQueue

//队列中定义的线程用来等待队列的头元素。这种leader-follower设计模式的变种有助于使等待的时间最小化。当一个线程变为leader,它只需等待下一个delay的时间,而其他线程将无限期等待。
//leader线程在从take()或者poll()等等方法返回之前必须signal其他线程,除非这期间其他线程变成了leader线程。
//每当队列的头结点被更早到期时间的节点替代,leader失效被重新设置为null,其他的线程——不一定是当前的leader,将被signal。
//因此等待线程要时刻准备着获取leader或者丧失leader
private Thread leader = null;

public void offer(FetchRequest e) {
    final Lock lock = this.lock;
    lock.lock();
    try {
       //如果已经关联了队列且不是关联本队列的请求,不予添加
       if (e.getRefQueue() != null && e.getRefQueue() != this) {
           return;
       }
       // 请求关联本队列
       e.setRefQueue(this);
       //入队、排序
       this.queue.offer(e);
       Collections.sort(this.queue);
       // Leader is changed.
       if (this.queue.peek() == e) {
           this.leader = null;
           this.available.signal();
       }
   }finally {
       lock.unlock();
   }
}
public FetchRequest take() throws InterruptedException {
        final Lock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                FetchRequest first = this.queue.peek();
                if (first == null) {//队列没有请求,等待
                    this.available.await();
                }
                else {
                    //当队列中存在延迟已到元素,则当前线程进行处理。只有当所有元素都延迟未到,才需要考虑leader问题
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0) {//请求延迟时间已到,出队接受处理
                        return this.queue.poll();
                    }
                    else if (this.leader != null) {//当前线程为fllower,等待成为leader,其他线程已变为leader
                        this.available.await();
                    }
                    else {
                        //当前线程设置为leader
                        Thread thisThread = Thread.currentThread();
                        this.leader = thisThread;
                        try {
                            //leader线程等待剩余延迟时间
                            this.available.awaitNanos(delay);
                        }
                        finally {
                            //延迟过后如果当前线程还是leader,则leader失效。重新争夺leader
                            if (this.leader == thisThread) {
                                this.leader = null;
                            }
                        }
                    }
                }
            }
        }
        finally {
            //其他线程没有成为leader,并且队列不空,则唤醒一个等待者
            if (this.leader == null && this.queue.peek() != null) {
                this.available.signal();
            }
            lock.unlock();
        }
    }