Hadoop Datanode支持磁盘故障代码hack
背景
Hadoop当中的每一个datanode上,都会保存一些HDFS中文件 的blocks,而这些blocks实际上都是按照一定的格式保存在 datanode这台机器的某些本地目录中的,当通过hadoop向HDFS中保存文件的时候,这些文件就会被hadoop按照blocksize切分成多个blocks,并按照一定的负载和调度算法和配置文件中设置 的每个block的副本数分配到集群的某些datanode上去。而且hadoop最近的版本还支持datanode上将HDFS对应的本地目录设置成多个。这个功能 非常有价值,因为通过这个配置,可以在集群的datanode上挂载多个磁盘,每个磁盘挂载在不同的目录下,然后在 hadoop-site.xml中将datanode的dfs.data.dir配置成由逗号分开的多个目录,这些目录分别对应了多个挂载的磁盘。这样可以在集群的io非常高的时候将io操作分配到各个磁盘上去,减少磁盘的压力和出错的几率。但是,根据对目前 0.19.0的版本中的源代码的研究发现,当多个磁盘中的任何一个crash后,整个datanode就会shutdown它自己,而不管其他的磁盘是不是仍然可以工作。这样其实就产生了问题:因为如果仅仅是一个磁盘发生错误就把整个datanode给停掉,那么namenode就势必会在一定的时间间隔后将这台datanode上保存的所有的blocks转存到其他的datanode上去,以保持blocks的副本数不低于hadoop配置文件中 dfs.replication配置项指定的数目。但是这种情况下其实出现问题的那台datanode上仅仅是一个磁盘上blocks需要被转存,其他好的磁盘上的blocks是仍然可用的,这样就会增加集群中的网络 负载和集群中所有机器的负载,而且还会造成资源的浪费和集群资源的不合理使用。如果 datanode上有好几十TB的数据 ,那么这个过程的代价将会更加严重。而更加合理的处理应该是:即使datanode上有某一个磁盘或者多个磁盘 crash了,只要不是全部crash,datanode应该仍然保持工作,并期待坏掉的磁盘能够在一定的时间内被repaired,然后重新插入机器中并重新开始工作。这些磁盘上原本保存的blocks能恢复最好,即使不能恢复,由于namenode有replication机制,也可以保持这些 blocks的副本数不会低于dfs.replication的配置。因此,可行的办法就是研究hadoop这个部分的代码,对它进行修改,以支持以上所说的这种功能。
代码Hack
hadoop中关于这部分的代码被pack在了 org.apache.hadoop.hdfs.server.datanode中,主要的三个类是 DataNode,FSDataSet和DataBlockScanner,其中Datanode类就是一个datanode运行instance的抽象,FSDataSet用来表示datanode节点上关于磁盘配置的信息和一些处理接口,而DataBlockScanner是一个线程,用来不断的检查该datanode上的blocks信息。运行的机制是这样的:
- 当datanode启动的时候,会初始化许多的信息,如和 namenode通信的socket信息,从hadoop配置文件中读取的配置信息,并利用这些配置信息初始化该datanode instance
- 每个datanode中都会有多个内部线程在轮询的作一些操作,其中有一个为DataTransfer,用来向其他datanode传输block数据。 datanode启动过程中同样会把这个线程启动。
- 启动datanode时,同样还会将DataBlockScanner线程启动,这个线程用来keep track datanode上的block和更新信息。
- datanode中还保存了一个FSDataset的实例 ,它用来记录当前 datanode上关于磁盘的配置信息,以及这些磁盘或者路径下中保存的 HDFS分布式文件系统 中的信息。通过对 hadoop配置文件的读取,datanode也会初始化这个FSDataset类的instance。
- datanode本身也是一个线程类,它的run()中会间歇的调用一个服务方法:offerService(),这个方法里记录处理的就是datanode的核心处理逻辑。这当中的处理包括:
- 每隔3秒钟向namenode发送一次自己的heartbeat信息,这些信息被namenode接收到以后会根据对该 heartbeat的分析向 datanode返回一个datanode需要的操作(DatanodeCommand),并根据从namenode返回的这个 DatanodeCommand来作自己相应的操作。
- 然后会检查本datanode是否有接收到新的block,并作相应的处理
- 然后检查上一次向namenode进行block report的时间,如果超过一定的时间(默认为1小时).就向namenode发送一次block report,以便让namenode上记录的信息保持更新。
- 每一次接收到来自namenode的操作信息 (DatanodeCommand),datanode都会作相应的操作。
在datanode操作hdfs时,它会先从它内部保存的FSDataSet实例中得到下一个轮转到的FSVolume,这么一个FSVolume代表了dfs.data.dir的配置项中用逗号分隔开的某一个本地磁盘目录,然后FSDataSet实例会试着在这个FSVolume中的FSDir实例的checkDirTree()方法:
public void checkDirTree() throws DiskErrorException {
DiskChecker.checkDir(dir);
if (children != null) {
for (int i = 0; i < children.length; i++) {
children[i].checkDirTree();
}
}
}
从程序中可以看出,实际上它是首先用一个DiskChecker类来check这个dir是否是合法的,然后再check 这个dir的子目录,而判断这个dir是否合法的逻辑如下:
if (!mkdirsWithExistsCheck(dir))
throw new DiskErrorException("can not create directory: "
+ dir.toString());
if (!dir.isDirectory())
throw new DiskErrorException("not a directory: "
+ dir.toString());
if (!dir.canRead())
throw new DiskErrorException("directory is not readable: "
+ dir.toString());
if (!dir.canWrite())
throw new DiskErrorException("directory is not writable: "
+ dir.toString());
从程序中可以看出,实际上,datanode首先尝试在这个dir中创建一个子目录,然后判断这个目录是否是一个合法的目录,是否可写,是否可读,一旦这几个判断的任何一个发生错误,datanode就认为这个目录出现了问题,于是抛出 DiskErrorException,0.19.0的hadoop此时会把这个异常连续的向上的调用抛出,直到FSVolumeSet实例的 checkDir(),此时datanode发现磁盘错误,然后shutdown()它自己,datanode退出集群。这就是目前datanode处理磁盘的逻辑。但是想想可以发现,这样的逻辑其实不是最好的,因为就如上面开头描述的那样,此时如果datanode上配置了多磁盘,很有可能其他的磁盘都是好的,可以继续工作,需要修复或者copy副本到其他datanode的blocks仅仅是这块坏掉的磁盘上的blocks。
既然明白了 datanode处理磁盘错误的逻辑,就可以自己修改datanode的实现代码,来满足自己的需要。
由于datanode关于磁盘的检错的调用流程为DataNode.checkDiskError( ) -> FSDataSet.checkDataDir() -> FSVolume.checkDirs(),就在这一步,一旦任何一块磁盘发生异常,就把一场抛给了Datanode,datanode于是 shutdown(),并等待管理人员的修复,并在一段时间之后开始拷贝这个datanode上的副本到其他的datanode上去。
所以,在FSVolume的checkDirs()方法中,可以做如下修改:
List<FSVolume> goodVolumes = new ArrayList<FSVolume>();
for (int idx = 0; idx < volumes.length; idx++) {
try {
volumes[idx].checkDirs();
goodVolumes.add(volumes[idx]);
} catch (DiskErrorException e) {
synchronized(crashedVolumes){
crashedVolumes.add(volumes[idx]);
}
}
}
if(goodVolumes.size() == 0) {
throw new AllDiskErrorException("All " + volumes.length + " disk(s) error: ");
} else if (volumes.length - goodVolumes.size() > 0) {
volumes = goodVolumes.toArray(new FSVolume[0]);
throw new DataNodeDiskErrorException(sb.toString());
}
程序的逻辑为:创建一个新的队列,用来保存在遍历每一个FSVolume,如果当前的FSVolume是好的,就加入到这个新的goodVolumes队列中去,而一旦出现坏的磁盘或者dir,就把它加入到crashedVolumes队列中,最后遍历完成后,将goodVolumes中的FSVolume保存为队列重新赋予给 volumes。
同时,在datanode中create一个线程,让它没过一段时间去check,看是否crashedVolumes 的队列中是否有FSVolume的实例,如果有是否已经repaired,如果没有就继续等待下一次check,代码如下:
class CrashVolumeChecker implements Runnable {
public void run() {
while (true) {
if (data.checkCrashedVolumes()) {
try {
data.checkDataDir();
reBlockReport();
} catch (DataNodeDiskErrorException de) {
handleDiskError(de.getMessage());
} catch (AllDiskErrorException de) {
handleAllDiskError(de.getMessage());
}
}
try {
Thread.sleep(CRASH_VOLUME_CHECK_INTERVAL);
} catch (InterruptedException ie) {
}
}
}
}
然后再在datanode的run()中将这个线程启动,就可以了。