lucene中一个多线程时顺序处理某些事务的例子(WaitQueue)
在lucene中,文档是按照添加的顺序编号的,由lucene的索引格式可知,文档是按照ID的顺序从小到大写到索引文件中的。在多线程建索引的情况下,有可能编号考前的文档是个大文档,处理的时间比较长,而后边的文档又处理的比较快,那么就需要将后边先处理完的文档缓存起来,等前面的文档处理完了再顺序的写到索引文件中。lucene中的WaitQueue类就是实现这个功能的。
private class WaitQueue { DocWriter[] waiting; int nextWriteDocID; int nextWriteLoc; int numWaiting; long waitingBytes; public WaitQueue() { waiting = new DocWriter[10]; } synchronized void reset() { // NOTE: nextWriteLoc doesn't need to be reset //nextWriteLoc不用重置,这个Queue相当于一个环形队列,从哪里开始都一样
assertnumWaiting==0;assertwaitingBytes==0;nextWriteDocID=0;
}synchronizedbooleandoResume(){finaldoublemb=config.getRAMBufferSizeMB();finallongwaitQueueResumeBytes;if(mb==IndexWriterConfig.DISABLE_AUTO_FLUSH){waitQueueResumeBytes=2*1024*1024;}else{waitQueueResumeBytes=(long)(mb*1024*1024*0.05);}returnwaitingBytes<=waitQueueResumeBytes;}synchronizedbooleandoPause(){//判断已经使用的内存是否大于设定的最大使用内存
finaldoublemb=config.getRAMBufferSizeMB();finallongwaitQueuePauseBytes;if(mb==IndexWriterConfig.DISABLE_AUTO_FLUSH){waitQueuePauseBytes=4*1024*1024;}else{waitQueuePauseBytes=(long)(mb*1024*1024*0.1);}returnwaitingBytes>waitQueuePauseBytes;}synchronizedvoidabort(){intcount=0;for(inti=0;i<waiting.length;i++){finalDocWriterdoc=waiting[i];if(doc!=null){doc.abort();waiting[i]=null;count++;}}waitingBytes=0;assertcount==numWaiting;numWaiting=0;}privatevoidwriteDocument(DocWriterdoc)throwsIOException{assertdoc==skipDocWriter||nextWriteDocID==doc.docID;booleansuccess=false;try{doc.finish();nextWriteDocID++;nextWriteLoc++;assertnextWriteLoc<=waiting.length;if(nextWriteLoc==waiting.length){nextWriteLoc=0;}success=true;}finally{if(!success){setAborting();}}}synchronizedpublicbooleanadd(DocWriterdoc)throwsIOException{assertdoc.docID>=nextWriteDocID;if(doc.docID==nextWriteDocID){writeDocument(doc);while(true){doc=waiting[nextWriteLoc];if(doc!=null){numWaiting--;waiting[nextWriteLoc]=null;waitingBytes-=doc.sizeInBytes();writeDocument(doc);}else{break;}}}else{//Ifinishedbeforedocumentsthatwereadded//beforeme.ThiscaneasilyhappenwhenIama//smalldocandthedocsbeforemewerelarge,or,//justduetoluckinthethreadscheduling.Just//addmyselftothequeueandwhenthatlargedoc//finishes,itwillflushme:intgap=doc.docID-nextWriteDocID;
//将处理文档的ID与下一个处理文档的ID之差大于队列的长度时,重新申请大一些的队列
if (gap >= waiting.length) { // Grow queue DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)]; assert nextWriteLoc >= 0; System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc); System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc); nextWriteLoc = 0; waiting = newArray; gap = doc.docID - nextWriteDocID; } int loc = nextWriteLoc + gap; if (loc >= waiting.length) { loc -= waiting.length; } // We should only wrap one time assert loc < waiting.length; // Nobody should be in my spot! assert waiting[loc] == null; waiting[loc] = doc; numWaiting++; waitingBytes += doc.sizeInBytes(); } return doPause(); } }