lucene中一个多线程时顺序处理某些事务的例子(WaitQueue)

      在lucene中,文档是按照添加的顺序编号的,由lucene的索引格式可知,文档是按照ID的顺序从小到大写到索引文件中的。在多线程建索引的情况下,有可能编号考前的文档是个大文档,处理的时间比较长,而后边的文档又处理的比较快,那么就需要将后边先处理完的文档缓存起来,等前面的文档处理完了再顺序的写到索引文件中。lucene中的WaitQueue类就是实现这个功能的。

   

private class WaitQueue {
    DocWriter[] waiting;
    int nextWriteDocID;
    int nextWriteLoc;
    int numWaiting;
    long waitingBytes;

    public WaitQueue() {
      waiting = new DocWriter[10];
    }

    synchronized void reset() {
      // NOTE: nextWriteLoc doesn't need to be reset
      //nextWriteLoc不用重置,这个Queue相当于一个环形队列,从哪里开始都一样

assertnumWaiting==0;assertwaitingBytes==0;nextWriteDocID=0;

}synchronizedbooleandoResume(){finaldoublemb=config.getRAMBufferSizeMB();finallongwaitQueueResumeBytes;if(mb==IndexWriterConfig.DISABLE_AUTO_FLUSH){waitQueueResumeBytes=2*1024*1024;}else{waitQueueResumeBytes=(long)(mb*1024*1024*0.05);}returnwaitingBytes<=waitQueueResumeBytes;}synchronizedbooleandoPause(){//判断已经使用的内存是否大于设定的最大使用内存

finaldoublemb=config.getRAMBufferSizeMB();finallongwaitQueuePauseBytes;if(mb==IndexWriterConfig.DISABLE_AUTO_FLUSH){waitQueuePauseBytes=4*1024*1024;}else{waitQueuePauseBytes=(long)(mb*1024*1024*0.1);}returnwaitingBytes>waitQueuePauseBytes;}synchronizedvoidabort(){intcount=0;for(inti=0;i<waiting.length;i++){finalDocWriterdoc=waiting[i];if(doc!=null){doc.abort();waiting[i]=null;count++;}}waitingBytes=0;assertcount==numWaiting;numWaiting=0;}privatevoidwriteDocument(DocWriterdoc)throwsIOException{assertdoc==skipDocWriter||nextWriteDocID==doc.docID;booleansuccess=false;try{doc.finish();nextWriteDocID++;nextWriteLoc++;assertnextWriteLoc<=waiting.length;if(nextWriteLoc==waiting.length){nextWriteLoc=0;}success=true;}finally{if(!success){setAborting();}}}synchronizedpublicbooleanadd(DocWriterdoc)throwsIOException{assertdoc.docID>=nextWriteDocID;if(doc.docID==nextWriteDocID){writeDocument(doc);while(true){doc=waiting[nextWriteLoc];if(doc!=null){numWaiting--;waiting[nextWriteLoc]=null;waitingBytes-=doc.sizeInBytes();writeDocument(doc);}else{break;}}}else{//Ifinishedbeforedocumentsthatwereadded//beforeme.ThiscaneasilyhappenwhenIama//smalldocandthedocsbeforemewerelarge,or,//justduetoluckinthethreadscheduling.Just//addmyselftothequeueandwhenthatlargedoc//finishes,itwillflushme:intgap=doc.docID-nextWriteDocID;

//将处理文档的ID与下一个处理文档的ID之差大于队列的长度时,重新申请大一些的队列

 if (gap >= waiting.length) { // Grow queue DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)]; assert nextWriteLoc >= 0; System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc); System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc); nextWriteLoc = 0; waiting = newArray; gap = doc.docID - nextWriteDocID; } int loc = nextWriteLoc + gap; if (loc >= waiting.length) { loc -= waiting.length; } // We should only wrap one time assert loc < waiting.length; // Nobody should be in my spot! assert waiting[loc] == null; waiting[loc] = doc; numWaiting++; waitingBytes += doc.sizeInBytes(); } return doPause(); } } 

相关推荐