lucene中一个多线程时顺序处理某些事务的例子(WaitQueue)
在lucene中,文档是按照添加的顺序编号的,由lucene的索引格式可知,文档是按照ID的顺序从小到大写到索引文件中的。在多线程建索引的情况下,有可能编号考前的文档是个大文档,处理的时间比较长,而后边的文档又处理的比较快,那么就需要将后边先处理完的文档缓存起来,等前面的文档处理完了再顺序的写到索引文件中。lucene中的WaitQueue类就是实现这个功能的。
算法主要是通过一个数组来实现的,要点是要把这个数组看成一个环形的队列。标记当前处理的doc号,得到新加入的doc与当前处理的doc的差值gap,那么就将新doc存储在当前的数组位置距离gap的位置。
private class WaitQueue { DocWriter[] waiting; int nextWriteDocID; int nextWriteLoc; int numWaiting; long waitingBytes; public WaitQueue() { waiting = new DocWriter[10]; } synchronized void reset() { // NOTE: nextWriteLoc doesn't need to be reset //nextWriteLoc不用重置,这个Queue相当于一个环形队列,从哪里开始都一样 assert numWaiting == 0; assert waitingBytes == 0; nextWriteDocID = 0; } synchronized boolean doResume() { final double mb = config.getRAMBufferSizeMB(); final long waitQueueResumeBytes; if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) { waitQueueResumeBytes = 2*1024*1024; } else { waitQueueResumeBytes = (long) (mb*1024*1024*0.05); } return waitingBytes <= waitQueueResumeBytes; } synchronized boolean doPause() { //判断已经使用的内存是否大于设定的最大使用内存 final double mb = config.getRAMBufferSizeMB(); final long waitQueuePauseBytes; if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) { waitQueuePauseBytes = 4*1024*1024; } else { waitQueuePauseBytes = (long) (mb*1024*1024*0.1); } return waitingBytes > waitQueuePauseBytes; } synchronized void abort() { int count = 0; for(int i=0;i<waiting.length;i++) { final DocWriter doc = waiting[i]; if (doc != null) { doc.abort(); waiting[i] = null; count++; } } waitingBytes = 0; assert count == numWaiting; numWaiting = 0; } private void writeDocument(DocWriter doc) throws IOException { assert doc == skipDocWriter || nextWriteDocID == doc.docID; boolean success = false; try { doc.finish(); nextWriteDocID++; nextWriteLoc++; assert nextWriteLoc <= waiting.length; if (nextWriteLoc == waiting.length) { nextWriteLoc = 0; } success = true; } finally { if (!success) { setAborting(); } } } synchronized public boolean add(DocWriter doc) throws IOException { assert doc.docID >= nextWriteDocID; if (doc.docID == nextWriteDocID) { writeDocument(doc); while(true) { doc = waiting[nextWriteLoc]; if (doc != null) { numWaiting--; waiting[nextWriteLoc] = null; waitingBytes -= doc.sizeInBytes(); writeDocument(doc); } else { break; } } } else { // I finished before documents that were added // before me. This can easily happen when I am a // small doc and the docs before me were large, or, // just due to luck in the thread scheduling. Just // add myself to the queue and when that large doc // finishes, it will flush me: int gap = doc.docID - nextWriteDocID; //将处理文档的ID与下一个处理文档的ID之差大于队列的长度时,重新申请大一些的队列 if (gap >= waiting.length) { // Grow queue DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)]; assert nextWriteLoc >= 0; System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc); System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc); nextWriteLoc = 0; waiting = newArray; gap = doc.docID - nextWriteDocID; } int loc = nextWriteLoc + gap; if (loc >= waiting.length) { loc -= waiting.length; } // We should only wrap one time assert loc < waiting.length; // Nobody should be in my spot! assert waiting[loc] == null; waiting[loc] = doc; numWaiting++; waitingBytes += doc.sizeInBytes(); } return doPause(); } }
相关推荐
一切依旧 2017-04-11
renjinlong 2020-09-03
Jacry 2020-07-04
IceStreamLab 2020-06-26
mengyue 2020-06-09
PasserbyX 2020-05-16
mameng 2020-05-12
心丨悦 2020-05-06
编码之路 2020-05-03
mengyue 2020-05-02
qiuzhuoxian 2020-02-23
编码之路 2020-02-20
lionelf 2020-02-03
TyCoding 2020-02-01
heniancheng 2020-01-31
某某某 2020-01-30
PinkBean 2020-01-29
某某某 2020-01-12
编码之路 2020-01-01