Hbase HLog源代码阅读笔记

HLog

当客户端往RegionServer上提交了一个更新操作后,会调用HLog的append方法往WAL上写一个节点,入口方法就是append

1.append

publicvoidappend(HRegionInfoinfo,byte[]tableName,WALEditedits,

finallongnow)

throwsIOException{

if(edits.isEmpty())return;

if(this.closed){

thrownewIOException("Cannotappend;logisclosed");

}

synchronized(this.updateLock){

longseqNum=obtainSeqNum();

byte[]hriKey=info.getEncodedNameAsBytes();

this.lastSeqWritten.putIfAbsent(hriKey,seqNum);//存的是一个最老的sqeNum,这是代表,比该值等于或大于的数据都是没有持久化的

HLogKeylogKey=makeKey(hriKey,tableName,seqNum,now);

doWrite(info,logKey,edits);//写数据,关键方法

this.numEntries.incrementAndGet();

}

//Syncifcatalogregion,andifnotthencheckifthattablesupports

//deferredlogflushing

if(info.isMetaRegion()||

!info.getTableDesc().isDeferredLogFlush()){

this.sync();//如果是Meta表或是表不允许延迟同步,则立即同步

}

}

2.doWrite

protectedvoiddoWrite(HRegionInfoinfo,HLogKeylogKey,WALEditlogEdit)

throwsIOException{

if(!this.enabled){

return;

}

if(!this.listeners.isEmpty()){

for(WALObserveri:this.listeners){

i.visitLogEntryBeforeWrite(info,logKey,logEdit);//观察者模式,以便调起其他需要通知的方法

}

}

try{

longnow=System.currentTimeMillis();

this.writer.append(newHLog.Entry(logKey,logEdit));//重要方法是这句

longtook=System.currentTimeMillis()-now;

writeTime+=took;

writeOps++;

if(took>1000){

longlen=0;

for(KeyValuekv:logEdit.getKeyValues()){

len+=kv.getLength();

}

LOG.warn(String.format(

"%stook%dmsappendinganedittohlog;editcount=%d,len~=%s",

Thread.currentThread().getName(),took,this.numEntries.get(),

StringUtils.humanReadableInt(len)));//记录用时,如果超一秒则警告

}

}catch(IOExceptione){

LOG.fatal("Couldnotappend.Requestingcloseofhlog",e);

requestLogRoll();//如果写出错日志会被截断

throwe;

}

}

SequenceFileLogWriter

3.append

publicvoidappend(HLog.Entryentry)throwsIOException{

this.writer.append(entry.getKey(),entry.getEdit());

}

SequenceFile.Writer

4.append

最终是调用hadoop的SequenceFile.Writer.append将数据持久化的。

当Region的memstoreflush之后,会往HLog里写一条日志,标明哪个表的哪个分区在哪个sequenceId这里持久化过一遍

1.completeCacheFlush

publicvoidcompleteCacheFlush(finalbyte[]encodedRegionName,

finalbyte[]tableName,finallonglogSeqId,finalbooleanisMetaRegion)

throwsIOException{

try{

if(this.closed){

return;

}

synchronized(updateLock){

longnow=System.currentTimeMillis();

WALEditedit=completeCacheFlushLogEdit();//这一句表名是Flush这种操作的日志

HLogKeykey=makeKey(encodedRegionName,tableName,logSeqId,

System.currentTimeMillis());//这一句表明该日志记录下了表名、分区名、当前的日志SequenceId

this.writer.append(newEntry(key,edit));//这一句写入日志文件

writeTime+=System.currentTimeMillis()-now;

writeOps++;

this.numEntries.incrementAndGet();

Longseq=this.lastSeqWritten.get(encodedRegionName);

if(seq!=null&&logSeqId>=seq.longValue()){

this.lastSeqWritten.remove(encodedRegionName);//每个Region最后更新SequenceId被删除,表明该Region没有数据需要持久化。

}

}

//synctxntofilesystem

this.sync();//这种flush操作很重要,一定要同步到hdfs的其他节点上

}finally{

this.cacheFlushLock.unlock();

}

}

相关推荐