Hadoop学习十三:Hadoop-Hdfs DataStorage源码
一.DataStorage
- DataStorage继承Storage。
- DataStorage对应Hdfs的dfs.data.dir目录。
- DataNode启动时,检查恢复状态转换dfs.data.dir下所有的StorageDirectory。
二.DataStorage类图
三.详细描述
- org.apache.hadoop.hdfs.server.protocol.NamespaceInfo:NamespaceInfo is returned by the name-node in reply to a data-node handshake.当我们配置完整个Hadoop系统时,做的第一件事就是
$bin/hadoop namenode -format
format后的NameNode产生一个namespaceId。当DataNode启动时,会向NameNode handshake,NameNode返回这个namespaceId。NamespaceInfo nsInfo = handshake();
- DataStorage:
- 参考代码中的步骤阅读。状态分析-----恢复操作-----状态转换
- 关于“一致性”的说明:参考http://zy19982004.iteye.com/blog/1876706第三张图,/update /rollback /finalize指令后,StorageDirectory下会存在不同的文件夹,当整个文件夹是某个中间状态时,就是“不一致”。举例:/update开始后,“一致”状态指的是current和previous都存在,也就是/update的123步都完成了;如果123中的中间某一步发生了问题或者还没有进行,造成了previous.tmp存在current不存在的情况,就是“不一致”,造成了previous.tmp存在current存在也是不一致,都需要doRecover。
四.DataStorage源码时序图
五.DataStorage源码
package org.apache.hadoop.hdfs.server.datanode; public class DataStorage extends Storage { //DataStorag独特属性 private String storageID; DataStorage() { super(NodeType.DATA_NODE); storageID = ""; } // DataNode.start时触发此方法 void recoverTransitionRead(NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt) throws IOException { // 1.在doTransition之前首先对每个SorageDirectory进行分析,检查其是否处于“一致状态” // 如果处于“不一致状态“,format or doRecover this.storageID = ""; this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>( dataDirs.size()); for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) { File dataDir = it.next(); StorageDirectory sd = new StorageDirectory(dataDir); StorageState curState; try { // 1.1 调用Storage.analyzeStorage进行状态分析 curState = sd.analyzeStorage(startOpt); // 1.2 调用format或者Storage.doRecover进行恢复操作 switch (curState) { case NORMAL: break; case NON_EXISTENT: // ignore this storage it.remove(); continue; case NOT_FORMATTED: // format format(sd, nsInfo); break; default: // recovery part is common sd.doRecover(curState); } } catch (IOException ioe) { sd.unlock(); throw ioe; } // 1.3 "一致状态"的StorageDirectory才会加入到Storage addStorageDir(sd); dataDirStates.add(curState); } // 2. doTransition 状态转换 // 根据DataNode启动参数做upgrade or rollback,其它正常启动 for (int idx = 0; idx < getNumStorageDirs(); idx++) { doTransition(getStorageDir(idx), nsInfo, startOpt); } // 3. Update 每个 StorageDirectory的VERSION this.writeAll(); } void format(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException { sd.clearDirectory(); // create directory this.layoutVersion = FSConstants.LAYOUT_VERSION; this.namespaceID = nsInfo.getNamespaceID(); this.cTime = 0; // store storageID as it currently is sd.write(); } //重写,DataNode增加storageID到VERSION protected void setFields(Properties props, StorageDirectory sd) throws IOException { super.setFields(props, sd); props.setProperty("storageID", storageID); } //重写,DataNode增加storageID到VERSION protected void getFields(Properties props, StorageDirectory sd) throws IOException { super.getFields(props, sd); String ssid = props.getProperty("storageID"); storageID = ssid; } /** * Analize which and whether a transition of the fs state is required and * perform it if necessary. Rollback if previousLV >= LAYOUT_VERSION && * prevCTime <= namenode.cTime Upgrade if this.LV > LAYOUT_VERSION || * this.cTime < namenode.cTime Regular startup if this.LV = LAYOUT_VERSION * && this.cTime = namenode.cTime */ // 2 doTransition 状态转换 // 根据DataNode启动参数做upgrade or rollback,其它正常启动 private void doTransition( StorageDirectory sd, NamespaceInfo nsInfo, StartupOption startOpt ) throws IOException { if (startOpt == StartupOption.ROLLBACK) //2.1 doRollback doRollback(sd, nsInfo); // rollback if applicable sd.read(); ... if (this.layoutVersion > FSConstants.LAYOUT_VERSION || this.cTime < nsInfo.getCTime()) { //2.2 doUpgrade doUpgrade(sd, nsInfo); // upgrade return; } } // 2.1 doRollback // 1.current-----> removed.tmp // 2.previous---->current // 3.删除removed.tmp void doRollback(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException { rename(curDir, tmpDir); rename(prevDir, curDir); deleteDir(tmpDir); } // 2.2 doUpgrade // 1.current-----> previous.tmp // 2.重建current:创建previous.tmp到current的硬链接,写入VERSION // 3.previous.tmp----->previous void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException { HardLink hardLink = new HardLink(); rename(curDir, tmpDir); linkBlocks(tmpDir, curDir, this.getLayoutVersion(), hardLink); sd.write(); rename(tmpDir, prevDir); } void finalizeUpgrade() throws IOException { for (Iterator<StorageDirectory> it = storageDirs.iterator(); it .hasNext();) { doFinalize(it.next()); } } // 1.previous---->finalized.tmp // 2.删除finalized.tmp void doFinalize(StorageDirectory sd) throws IOException { rename(prevDir, tmpDir); new Daemon(new Runnable() { public void run() { deleteDir(tmpDir); } }).start(); } }