Hbase bulkload源代码阅读笔记

1.LoadIncrementalHFiles.doBulkLoad(PathhfofDir,HTabletable)

首先用discoverLoadQueue方法扫描出hfofDir下有哪些fhile文件,再循环调用tryLoad方法把每个文件load进去,这是一个串行的过程。

Deque<LoadQueueItem>queue=null;

queue=discoverLoadQueue(hfofDir);

while(!queue.isEmpty()){

LoadQueueItemitem=queue.remove();

tryLoad(item,conn,table.getTableName(),queue);

}

2.LoadIncrementalHFiles.discoverLoadQueue(PathhfofDir)

hfofDir下是两层目录结构family-->hfile,因此二重循环遍历每个hfile文件,加到Deque里返回,hfofDir下以"_"开头的不是family目录。

LoadQueueItem的数据结构用于记录family和hfile

finalbyte[]family;

finalPathhfilePath;

FileStatus[]familyDirStatuses=fs.listStatus(hfofDir);//第一层目录

Deque<LoadQueueItem>ret=newLinkedList<LoadQueueItem>();

for(FileStatusstat:familyDirStatuses){

PathfamilyDir=stat.getPath();

//Skip_logs,etc

if(familyDir.getName().startsWith("_"))continue;//以"_"开头的不是family目录。

byte[]family=familyDir.getName().getBytes();

Path[]hfiles=FileUtil.stat2Paths(fs.listStatus(familyDir));//第二层目录

for(Pathhfile:hfiles){

if(hfile.getName().startsWith("_"))continue;

ret.add(newLoadQueueItem(family,hfile));

}

}

3.LoadIncrementalHFiles.tryLoad(finalLoadQueueItemitem,

HConnectionconn,finalbyte[]table,

finalDeque<LoadQueueItem>queue)

首先检查当前的hfile所属的region是否已经发生分裂,如果发生分裂,则将hfile分裂成匹配新region的两个hfile,并将这两个hfile放入deque;哪果没有发生分裂,则调用region所在server的bulkLoadHFile方法将hfile导入。重点是以下几句

if(!hri.containsRange(first,last)){//判断包含firstkey的当前region是否包含hfile的startkey和endkey,如果不包含说明当前region是分裂过的

LOG.info("HFileat"+hfilePath+"nolongerfitsinsideasingle"+

"region.Splitting...");

HColumnDescriptorfamilyDesc=hri.getTableDesc().getFamily(item.family);

PathbotOut=newPath(tmpDir,hri.getEncodedName()+".bottom");

PathtopOut=newPath(tmpDir,hri.getEncodedName()+".top");

splitStoreFile(getConf(),hfilePath,familyDesc,hri.getEndKey(),

botOut,topOut);//以当前region的endkey为中值分裂hfile为两个,文件存为.bottom和.top

//Addthesebackatthe*front*ofthequeue,sothere'salower

//chancethattheregionwilljustsplitagainbeforewegetthere.

//.bottom和.top重新放回queue

queue.addFirst(newLoadQueueItem(item.family,botOut));

queue.addFirst(newLoadQueueItem(item.family,topOut));

LOG.info("SuccessfullysplitintonewHFiles"+botOut+"and"+topOut);

returnnull;

}

byte[]regionName=location.getRegionInfo().getRegionName();

server.bulkLoadHFile(hfilePath.toString(),regionName,item.family);//如果包含,直接调用region所在server的bulkLoadHFile方法将hfile导入

returnnull;

4.LoadIncrementalHFiles.splitStoreFile(

Configurationconf,PathinFile,

HColumnDescriptorfamilyDesc,byte[]splitKey,

PathbottomOut,PathtopOut)

//以splitKey为中值,将inFile拷贝分裂为bottomOut和topOut两个文件

ReferencetopReference=newReference(splitKey,Range.top);

ReferencebottomReference=newReference(splitKey,Range.bottom);

copyHFileHalf(conf,inFile,topOut,topReference,familyDesc);

copyHFileHalf(conf,inFile,bottomOut,bottomReference,familyDesc);

5.HRegionServer.bulkLoadHFile(StringhfilePath,byte[]regionName,

byte[]familyName)

这是个HRegionInterface下的远程调用,是在regionserver中执行的。

checkOpen();//检查region是否已停,已经停了便不再导

HRegionregion=getRegion(regionName);//从regionserver中拿到region

region.bulkLoadHFile(hfilePath,familyName);//这步才开始导

6.HRegion.bulkLoadHFile(StringhfilePath,byte[]familyName)

throwsIOException{

startRegionOperation();//上读锁

try{

Storestore=getStore(familyName);

if(store==null){

thrownewDoNotRetryIOException(

"Nosuchcolumnfamily"+Bytes.toStringBinary(familyName));

}

store.bulkLoadHFile(hfilePath);//调store的同名方法

}finally{

closeRegionOperation();//解读锁

}

}

7.Store.bulkLoadHFile(StringsrcPathStr)

就三步:首先将hfile重命名到store的目录下;其次将hfile包装成StoreFile对象装载到Store的列表里。在这两步之前是再一次检查region的startkey和endkey是否跟hfile的匹配

//再次检查是否匹配region

HRegionInfohri=region.getRegionInfo();

if(!hri.containsRange(firstKey,lastKey)){

thrownewWrongRegionException(

"Bulkloadfile"+srcPathStr+"doesnotfitinsideregion"

+this.region);

}

//挪文件

PathsrcPath=newPath(srcPathStr);

PathdstPath=StoreFile.getRandomFilename(fs,homedir);

LOG.info("Renamingbulkloadfile"+srcPath+"to"+dstPath);

StoreFile.rename(fs,srcPath,dstPath);

//装载文件以提供在线服务

//Appendthenewstorefileintothelist

this.lock.writeLock().lock();//加store的写锁

try{

ArrayList<StoreFile>newFiles=newArrayList<StoreFile>(storefiles);

newFiles.add(sf);

this.storefiles=sortAndClone(newFiles);

notifyChangedReadersObservers();

}finally{

this.lock.writeLock().unlock();//解store的写锁

}

相关推荐