Hbase HLog源代码阅读笔记
HLog
当客户端往RegionServer上提交了一个更新操作后,会调用HLog的append方法往WAL上写一个节点,入口方法就是append
1.append
publicvoidappend(HRegionInfoinfo,byte[]tableName,WALEditedits,
finallongnow)
throwsIOException{
if(edits.isEmpty())return;
if(this.closed){
thrownewIOException("Cannotappend;logisclosed");
}
synchronized(this.updateLock){
longseqNum=obtainSeqNum();
byte[]hriKey=info.getEncodedNameAsBytes();
this.lastSeqWritten.putIfAbsent(hriKey,seqNum);//存的是一个最老的sqeNum,这是代表,比该值等于或大于的数据都是没有持久化的
HLogKeylogKey=makeKey(hriKey,tableName,seqNum,now);
doWrite(info,logKey,edits);//写数据,关键方法
this.numEntries.incrementAndGet();
}
//Syncifcatalogregion,andifnotthencheckifthattablesupports
//deferredlogflushing
if(info.isMetaRegion()||
!info.getTableDesc().isDeferredLogFlush()){
this.sync();//如果是Meta表或是表不允许延迟同步,则立即同步
}
}
2.doWrite
protectedvoiddoWrite(HRegionInfoinfo,HLogKeylogKey,WALEditlogEdit)
throwsIOException{
if(!this.enabled){
return;
}
if(!this.listeners.isEmpty()){
for(WALObserveri:this.listeners){
i.visitLogEntryBeforeWrite(info,logKey,logEdit);//观察者模式,以便调起其他需要通知的方法
}
}
try{
longnow=System.currentTimeMillis();
this.writer.append(newHLog.Entry(logKey,logEdit));//重要方法是这句
longtook=System.currentTimeMillis()-now;
writeTime+=took;
writeOps++;
if(took>1000){
longlen=0;
for(KeyValuekv:logEdit.getKeyValues()){
len+=kv.getLength();
}
LOG.warn(String.format(
"%stook%dmsappendinganedittohlog;editcount=%d,len~=%s",
Thread.currentThread().getName(),took,this.numEntries.get(),
StringUtils.humanReadableInt(len)));//记录用时,如果超一秒则警告
}
}catch(IOExceptione){
LOG.fatal("Couldnotappend.Requestingcloseofhlog",e);
requestLogRoll();//如果写出错日志会被截断
throwe;
}
}
SequenceFileLogWriter
3.append
publicvoidappend(HLog.Entryentry)throwsIOException{
this.writer.append(entry.getKey(),entry.getEdit());
}
SequenceFile.Writer
4.append
最终是调用hadoop的SequenceFile.Writer.append将数据持久化的。
当Region的memstoreflush之后,会往HLog里写一条日志,标明哪个表的哪个分区在哪个sequenceId这里持久化过一遍
1.completeCacheFlush
publicvoidcompleteCacheFlush(finalbyte[]encodedRegionName,
finalbyte[]tableName,finallonglogSeqId,finalbooleanisMetaRegion)
throwsIOException{
try{
if(this.closed){
return;
}
synchronized(updateLock){
longnow=System.currentTimeMillis();
WALEditedit=completeCacheFlushLogEdit();//这一句表名是Flush这种操作的日志
HLogKeykey=makeKey(encodedRegionName,tableName,logSeqId,
System.currentTimeMillis());//这一句表明该日志记录下了表名、分区名、当前的日志SequenceId
this.writer.append(newEntry(key,edit));//这一句写入日志文件
writeTime+=System.currentTimeMillis()-now;
writeOps++;
this.numEntries.incrementAndGet();
Longseq=this.lastSeqWritten.get(encodedRegionName);
if(seq!=null&&logSeqId>=seq.longValue()){
this.lastSeqWritten.remove(encodedRegionName);//每个Region最后更新SequenceId被删除,表明该Region没有数据需要持久化。
}
}
//synctxntofilesystem
this.sync();//这种flush操作很重要,一定要同步到hdfs的其他节点上
}finally{
this.cacheFlushLock.unlock();
}
}