Hbase bulkload源代码阅读笔记
1.LoadIncrementalHFiles.doBulkLoad(PathhfofDir,HTabletable)
首先用discoverLoadQueue方法扫描出hfofDir下有哪些fhile文件,再循环调用tryLoad方法把每个文件load进去,这是一个串行的过程。
Deque<LoadQueueItem>queue=null;
queue=discoverLoadQueue(hfofDir);
while(!queue.isEmpty()){
LoadQueueItemitem=queue.remove();
tryLoad(item,conn,table.getTableName(),queue);
}
2.LoadIncrementalHFiles.discoverLoadQueue(PathhfofDir)
hfofDir下是两层目录结构family-->hfile,因此二重循环遍历每个hfile文件,加到Deque里返回,hfofDir下以"_"开头的不是family目录。
LoadQueueItem的数据结构用于记录family和hfile
finalbyte[]family;
finalPathhfilePath;
FileStatus[]familyDirStatuses=fs.listStatus(hfofDir);//第一层目录
Deque<LoadQueueItem>ret=newLinkedList<LoadQueueItem>();
for(FileStatusstat:familyDirStatuses){
PathfamilyDir=stat.getPath();
//Skip_logs,etc
if(familyDir.getName().startsWith("_"))continue;//以"_"开头的不是family目录。
byte[]family=familyDir.getName().getBytes();
Path[]hfiles=FileUtil.stat2Paths(fs.listStatus(familyDir));//第二层目录
for(Pathhfile:hfiles){
if(hfile.getName().startsWith("_"))continue;
ret.add(newLoadQueueItem(family,hfile));
}
}
3.LoadIncrementalHFiles.tryLoad(finalLoadQueueItemitem,
HConnectionconn,finalbyte[]table,
finalDeque<LoadQueueItem>queue)
首先检查当前的hfile所属的region是否已经发生分裂,如果发生分裂,则将hfile分裂成匹配新region的两个hfile,并将这两个hfile放入deque;哪果没有发生分裂,则调用region所在server的bulkLoadHFile方法将hfile导入。重点是以下几句
if(!hri.containsRange(first,last)){//判断包含firstkey的当前region是否包含hfile的startkey和endkey,如果不包含说明当前region是分裂过的
LOG.info("HFileat"+hfilePath+"nolongerfitsinsideasingle"+
"region.Splitting...");
HColumnDescriptorfamilyDesc=hri.getTableDesc().getFamily(item.family);
PathbotOut=newPath(tmpDir,hri.getEncodedName()+".bottom");
PathtopOut=newPath(tmpDir,hri.getEncodedName()+".top");
splitStoreFile(getConf(),hfilePath,familyDesc,hri.getEndKey(),
botOut,topOut);//以当前region的endkey为中值分裂hfile为两个,文件存为.bottom和.top
//Addthesebackatthe*front*ofthequeue,sothere'salower
//chancethattheregionwilljustsplitagainbeforewegetthere.
//.bottom和.top重新放回queue
queue.addFirst(newLoadQueueItem(item.family,botOut));
queue.addFirst(newLoadQueueItem(item.family,topOut));
LOG.info("SuccessfullysplitintonewHFiles"+botOut+"and"+topOut);
returnnull;
}
byte[]regionName=location.getRegionInfo().getRegionName();
server.bulkLoadHFile(hfilePath.toString(),regionName,item.family);//如果包含,直接调用region所在server的bulkLoadHFile方法将hfile导入
returnnull;
4.LoadIncrementalHFiles.splitStoreFile(
Configurationconf,PathinFile,
HColumnDescriptorfamilyDesc,byte[]splitKey,
PathbottomOut,PathtopOut)
//以splitKey为中值,将inFile拷贝分裂为bottomOut和topOut两个文件
ReferencetopReference=newReference(splitKey,Range.top);
ReferencebottomReference=newReference(splitKey,Range.bottom);
copyHFileHalf(conf,inFile,topOut,topReference,familyDesc);
copyHFileHalf(conf,inFile,bottomOut,bottomReference,familyDesc);
5.HRegionServer.bulkLoadHFile(StringhfilePath,byte[]regionName,
byte[]familyName)
这是个HRegionInterface下的远程调用,是在regionserver中执行的。
checkOpen();//检查region是否已停,已经停了便不再导
HRegionregion=getRegion(regionName);//从regionserver中拿到region
region.bulkLoadHFile(hfilePath,familyName);//这步才开始导
6.HRegion.bulkLoadHFile(StringhfilePath,byte[]familyName)
throwsIOException{
startRegionOperation();//上读锁
try{
Storestore=getStore(familyName);
if(store==null){
thrownewDoNotRetryIOException(
"Nosuchcolumnfamily"+Bytes.toStringBinary(familyName));
}
store.bulkLoadHFile(hfilePath);//调store的同名方法
}finally{
closeRegionOperation();//解读锁
}
}
7.Store.bulkLoadHFile(StringsrcPathStr)
就三步:首先将hfile重命名到store的目录下;其次将hfile包装成StoreFile对象装载到Store的列表里。在这两步之前是再一次检查region的startkey和endkey是否跟hfile的匹配
//再次检查是否匹配region
HRegionInfohri=region.getRegionInfo();
if(!hri.containsRange(firstKey,lastKey)){
thrownewWrongRegionException(
"Bulkloadfile"+srcPathStr+"doesnotfitinsideregion"
+this.region);
}
//挪文件
PathsrcPath=newPath(srcPathStr);
PathdstPath=StoreFile.getRandomFilename(fs,homedir);
LOG.info("Renamingbulkloadfile"+srcPath+"to"+dstPath);
StoreFile.rename(fs,srcPath,dstPath);
//装载文件以提供在线服务
//Appendthenewstorefileintothelist
this.lock.writeLock().lock();//加store的写锁
try{
ArrayList<StoreFile>newFiles=newArrayList<StoreFile>(storefiles);
newFiles.add(sf);
this.storefiles=sortAndClone(newFiles);
notifyChangedReadersObservers();
}finally{
this.lock.writeLock().unlock();//解store的写锁
}