lucene中一个多线程时顺序处理某些事务的例子(WaitQueue)

      在lucene中,文档是按照添加的顺序编号的,由lucene的索引格式可知,文档是按照ID的顺序从小到大写到索引文件中的。在多线程建索引的情况下,有可能编号考前的文档是个大文档,处理的时间比较长,而后边的文档又处理的比较快,那么就需要将后边先处理完的文档缓存起来,等前面的文档处理完了再顺序的写到索引文件中。lucene中的WaitQueue类就是实现这个功能的。

      算法主要是通过一个数组来实现的,要点是要把这个数组看成一个环形的队列。标记当前处理的doc号,得到新加入的doc与当前处理的doc的差值gap,那么就将新doc存储在当前的数组位置距离gap的位置。

private class WaitQueue {
    DocWriter[] waiting;
    int nextWriteDocID;
    int nextWriteLoc;
    int numWaiting;
    long waitingBytes;

    public WaitQueue() {
      waiting = new DocWriter[10];
    }

    synchronized void reset() {
      // NOTE: nextWriteLoc doesn't need to be reset
      //nextWriteLoc不用重置,这个Queue相当于一个环形队列,从哪里开始都一样
      assert numWaiting == 0;
      assert waitingBytes == 0;
      nextWriteDocID = 0;
 }

    synchronized boolean doResume() {
      final double mb = config.getRAMBufferSizeMB();
      final long waitQueueResumeBytes;
      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
        waitQueueResumeBytes = 2*1024*1024;
      } else {
        waitQueueResumeBytes = (long) (mb*1024*1024*0.05);
      }
      return waitingBytes <= waitQueueResumeBytes;
    }

    synchronized boolean doPause() {
      //判断已经使用的内存是否大于设定的最大使用内存
      final double mb = config.getRAMBufferSizeMB();
      final long waitQueuePauseBytes;
      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
        waitQueuePauseBytes = 4*1024*1024;
      } else {
        waitQueuePauseBytes = (long) (mb*1024*1024*0.1);
      }
      return waitingBytes > waitQueuePauseBytes;
    }

    synchronized void abort() {
      int count = 0;
      for(int i=0;i<waiting.length;i++) {
        final DocWriter doc = waiting[i];
        if (doc != null) {
          doc.abort();
          waiting[i] = null;
          count++;
        }
      }
      waitingBytes = 0;
      assert count == numWaiting;
      numWaiting = 0;
    }

    private void writeDocument(DocWriter doc) throws IOException {
      assert doc == skipDocWriter || nextWriteDocID == doc.docID;
      boolean success = false;
      try {
        doc.finish();
        nextWriteDocID++;
        nextWriteLoc++;
        assert nextWriteLoc <= waiting.length;
        if (nextWriteLoc == waiting.length) {
          nextWriteLoc = 0;
        }
        success = true;
      } finally {
        if (!success) {
          setAborting();
        }
      }
    }

    synchronized public boolean add(DocWriter doc) throws IOException {

      assert doc.docID >= nextWriteDocID;

      if (doc.docID == nextWriteDocID) {
        writeDocument(doc);
        while(true) {
          doc = waiting[nextWriteLoc];
          if (doc != null) {
            numWaiting--;
            waiting[nextWriteLoc] = null;
            waitingBytes -= doc.sizeInBytes();
            writeDocument(doc);
          } else {
            break;
          }
        }
      } else {

        // I finished before documents that were added
        // before me.  This can easily happen when I am a
        // small doc and the docs before me were large, or,
        // just due to luck in the thread scheduling.  Just
        // add myself to the queue and when that large doc
        // finishes, it will flush me:
        int gap = doc.docID - nextWriteDocID;
        //将处理文档的ID与下一个处理文档的ID之差大于队列的长度时,重新申请大一些的队列
        if (gap >= waiting.length) {
          // Grow queue
          DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
          assert nextWriteLoc >= 0;
          System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);
          System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);
          nextWriteLoc = 0;
          waiting = newArray;
          gap = doc.docID - nextWriteDocID;
        }

        int loc = nextWriteLoc + gap;
        if (loc >= waiting.length) {
          loc -= waiting.length;
        }

        // We should only wrap one time
        assert loc < waiting.length;

        // Nobody should be in my spot!
        assert waiting[loc] == null;
        waiting[loc] = doc;
        numWaiting++;
        waitingBytes += doc.sizeInBytes();
      }
      
      return doPause();
    }
  }
 

相关推荐