第七章:小朱笔记hadoop之源码分析-hdfs分析 第六节:SecondaryNameNode分析
第七章:小朱笔记hadoop之源码分析-hdfs分析
第六节:SecondaryNameNode分析
6.1 架构分析
6.2 SecondaryNameNode启动过程分析
6.3 Checkpoint分析
6.4 Import Checkpoint恢复数据
6.1 架构分析
SecondaryNameNode(snn)不是NameNode(nn)的热备进程。snn是HDFS架构中的一个组成部分,但是经常由于名字而被人误解它真正的用途,其实它真正的用途,是用来保存namenode中对HDFS metadata的信息的备份,并减少namenode重启的时间。对于hadoop进程中 ,要配置好并正确的使用 snn,还是需要做一些工作的。
hadoop的默认配置中让snn进程默认运行在了namenode的那台机器上,但是这样的话,如果这台机器出错,宕机,对恢复HDFS文件系统是很大的灾难,更好的方式是:将snn的进程配置在另外一台机器上运行。
在hadoop中,namenode负责对HDFS的metadata的持久化存储,并且处理来自客户端的对HDFS的各种操作的交互反馈。为了保证交互速度,HDFS文件系统的metadata是被load到namenode机器的内存中的,并且会将内存中的这些数据保存到磁盘进行持久化存储。为了保证这个持久化过程不会成为HDFS操作的瓶颈,hadoop采取的方式是:没有对任何一次的当前文件系统的snapshot进行持久化,对 HDFS最近一段时间的操作list会被保存到namenode中的一个叫Editlog的文件中去。当重启namenode时,除了load fsImage意外,还会对这个EditLog文件中记录的HDFS操作进行replay,以恢复HDFS重启之前的最终状态。
SecondaryNameNode,会周期性的从NameNode节点上下载元数据信息(元数据镜像fsimage 和元数据库操作日志edits),然后将fsimage和edits进行合并,生成新的fsimage(该fsimage就是Secondary NameNode下载时刻的元数据的Checkpoint),在本地保存,并将其推送到NameNode,同时重置NameNode上的edits。所以namenode的重启就会Load最新的一个checkpoint,并replay EditLog中 记录的hdfs操作,由于EditLog中记录的是从上一次checkpoint以后到现在的操作列表,所以就会比较小。如果没有snn的这个周期性的合 并过程,那么当每次重启namenode的时候,就会 花费很长的时间。而这样周期性的合并就能减少重启的时间。同时也能保证HDFS系统的完整性。在namenode发生故障无法启动时,可以使用snn准备的checkpoint文件,在namenode启动时带上-importCheckpoint参数来进行恢复。
这就是SecondaryNameNode所做的事情。所以snn并不能分担namenode上对HDFS交互性操作的压力。尽管如此,当 namenode机器宕机或者namenode进程出问题时,namenode的daemon进程可以通过人工的方式从snn上拷贝一份metadata 来恢复HDFS文件系统。
至于为什么要将SNN进程运行在一台非NameNode的 机器上,这主要出于两点考虑:
可扩展性: 创建一个新的HDFS的snapshot需要将namenode中load到内存的metadata信息全部拷贝一遍,这样的操作需要的内存就需要 和namenode占用的内存一样,由于分配给namenode进程的内存其实是对HDFS文件系统的限制,如果分布式文件系统非常的大,那么 namenode那台机器的内存就可能会被namenode进程全部占据。
容错性: 当snn创建一个checkpoint的时候,它会将checkpoint拷贝成metadata的几个拷贝。将这个操作运行到另外一台机器,还可以提供分布式文件系统的容错性。
优点
Hadoop自带机制,成熟可靠,使用简单方便,无需开发,配置即可。
Secondaryary NameNode定期做Checkpoint,可保证各个Checkpoint阶段的元数据的可靠性,同时,进行fsimage与edits的合并,可以有效限制edits的大小,防止其无限制增长。
缺点
没有做到热备,当NameNode无法提供服务时,需要重启NameNode,服务恢复时间与文件系统规模大小成正比。
Secondary NameNode保存的只是Checkpoint时刻的元数据,因此,一旦NameNode上的元数据损坏,通过Checkpoint恢复的元数据并不是HDFS此刻的最新数据,存在一致性问题。
NameNode 上实现了接口NamenodeProtocol就是用亍 NameNode 和 Secondary NameNode 间的命令通信。 NameNode 和 Secondary NameNode 间数据的通信,使用的是 HTTP 协议,HTTP 的容器用的是 jetty,TransferFsImage 是文件传输的辅劣类。
GetImageServlet的doGet方法目前支持FSImage(getimage),日志(getedit)和存 FSImage(putimage)。
例如:
http://×××××:50070/getimage?getimage 可以获取 FSImage。
http://×××××:50070/getimage?getedit 可以获取日志文件。
Secondary NameNode 发送一个 HTTP 请求到 NameNode,NameNode 上一个 HTTP 客户端到 Secondary NameNode 上去下载 FSImage。
6.2 SecondaryNameNode启动过程分析
(1)main方法
public static void main(String[] argv) throws Exception { StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG); Configuration tconf = new Configuration(); if (argv.length >= 1) { SecondaryNameNode secondary = new SecondaryNameNode(tconf); int ret = secondary.processArgs(argv); System.exit(ret); } // Create a never ending deamon // 不带参数启动的时候,会启动一个不结束的进程,用于执行定时checkpoint Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf)); //线程方式运行SecondaryNameNode.run checkpointThread.start(); }
(2)使用nn之前建立的RpcServer(dfs.namenode.servicerpc-address),建立同namenode之间的连接进行通信
//使用nn之前建立的RpcServer(dfs.namenode.servicerpc-address),建立同namenode之间的连接进行通信 nameNodeAddr = NameNode.getServiceAddress(conf, true); this.namenode =(NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class,NamenodeProtocol.versionID, nameNodeAddr, conf);
(3)初始化checkpoint的目录以及进行checkpoint的频率(fs.checkpoint.period和fs.checkpoint.size
fsName = getInfoServer(); //初始化checkpoint的目录以及进行checkpoint的频率(fs.checkpoint.period和fs.checkpoint.size) // initialize checkpoint directories ////读取fs.checkpoint.dir配置项作为ckp目录,默认为/tmp/hadoop/dfs/namesecondary checkpointDirs = FSImage.getCheckpointDirs(conf, "/tmp/hadoop/dfs/namesecondary"); //读取fs.checkpoint.edits.dir配置项作为ckp edit目录,默认为/tmp/hadoop/dfs/namesecondary checkpointEditsDirs = FSImage.getCheckpointEditsDirs(conf, "/tmp/hadoop/dfs/namesecondary"); //初始化checkpoint和checkpoint.edits目录,如果不存在就创造相应目录 checkpointImage = new CheckpointStorage(); checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs); //默认执行checkpoint时间间隔为1小时,edit文件大小为4M // Initialize other scheduling parameters from the configuration checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600); checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
(4)启动Http服务
//启动Http服务 try { infoServer = httpUGI.doAs(new PrivilegedExceptionAction<HttpServer>() { @Override public HttpServer run() throws IOException, InterruptedException { LOG.info("Starting web server as: " + UserGroupInformation.getCurrentUser().getUserName()); int tmpInfoPort = infoSocAddr.getPort(); infoServer = new HttpServer("secondary", infoBindAddress, tmpInfoPort, tmpInfoPort == 0, conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN)); if(UserGroupInformation.isSecurityEnabled()) { System.setProperty("https.cipherSuites", Krb5AndCertsSslSocketConnector.KRB5_CIPHER_SUITES.get(0)); InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(infoBindAddress + ":"+ conf.get( "dfs.secondary.https.port", infoBindAddress + ":" + 0)); imagePort = secInfoSocAddr.getPort(); infoServer.addSslListener(secInfoSocAddr, conf, false, true); } infoServer.setAttribute("name.system.image", checkpointImage); infoServer.setAttribute(JspHelper.CURRENT_CONF, conf); infoServer.addInternalServlet("getimage", "/getimage",GetImageServlet.class, true); infoServer.start(); return infoServer; } }); } catch (InterruptedException e) { throw new RuntimeException(e); }
完成初始化操作后,会单独启动线程,循环执行SecondaryNameNode.run,run()调用了SecondaryNameNode.doWork()方法。doWork默认每5分钟会进行一次检查,如果editlog的大小超过checkpointSize大小或者距离上一次checkpoint时间超出checkpointPeriod时间,则执行SecondaryNameNode.doCheckpoint:
6.3 Checkpoint分析
SecondaryNameNode节点启动之后会不断的对NameNode节点保存的元数据进行备份(定时备份),具体的说来就是:SecondaryNameNode的run方法每隔一段时间就会执行doCheckpoint()方法,SecondaryNameNode的主要工 作都在这个方法里。这个方法会从NameNode上取下FSImage和操作日志(当然也包括版本文件和fstime),然后在本地合并,然后再把合并后 的FSImage传回NameNode。这样既可以保存一个NameNode上的数据备份,又可以为NameNode节点分担一部分压力。
具体的流程如 下:
(1)调用startCheckpoint,为接下来的工作准备空间。首先存放FSImage和EditsLog的目录分别由配置文件中的fs.checkpoint.dir项和fs.checkpoint.edits.dir项来设置,然后会分别对这两类目录进行检查和恢复,对于已经存在的chechpoint要将它们设置成为lastCheckpoint;
(2) 创建RPC客户端,用于和NameNode节点通信;
(3)在SecondaryNameNode节点上开启Http服务,主要用来向NameNode节点传输合并好的元数据文件FSImage;
(4)远程调用NameNode的rollEditLog方法,让NameNode停止向edits上写操作日志,而是将新产生的日志转写到临时日志文件 edits.new上。同时,NameNode端的FSImage检查点状态要设置为ROLLED_EDITS。最后会返回一个检查点签名 CheckpointSignature;
(5)通过NameNode开启的Http服务从NameNode上下载FSImage和对应的操作日志,之后设置本地的检查点状态设置为UPLOAD_DONE;
(6)加载下载的FSImage和操作日志,从而合并成一个新的FSImage;
(7)通知NameNode新的FSImage文件已经合并好了,然后NameNode节点通过SecondaryNameNode节点的http服务来下载新的FSImage;
(8)远程调用NameNode的rollFsImage,来根据下载的最新FSImage替换原来的FSImage,临时日志文件edits.new重命名为edits;
(9)调用本地endCheckpoint方法,结束一次doCheckpoint流程。
void doCheckpoint() throws IOException { // Do the required initialization of the merge work area. // 开始Checkpoint前的初始化工作主要包括: // 1.unlock所有的checkpoint目录 // 2.关闭checkpoint的editlog文件 // 3.检查checkpoint目录和checkpoint edit目录是否正常 // 4.腾出checkpoint目录下的current目录,原current目录更名为lastcheckpoint.tmp startCheckpoint();//初始化 // Tell the namenode to start logging transactions in a new edit file // Retuns a token that would be used to upload the merged image. //通知namenode开始checkpoint,拿到namenode上的checkpoint标记,打开edits.new的文件流 CheckpointSignature sig = (CheckpointSignature)namenode.rollEditLog(); // error simulation code for junit test if (ErrorSimulator.getErrorSimulation(0)) { throw new IOException("Simulating error0 " + "after creating edits.new"); } //从namenode上下载fsimage文件与editlog文件 downloadCheckpointFiles(sig); // Fetch fsimage and edits //合并fsimage与editlog文件(将image和Editlog都加载到内存合并后再savenamespace) doMerge(sig); // Do the merge // // Upload the new image into the NameNode. Then tell the Namenode // to make this new uploaded image as the most current image. // // 将合并好的checkpoint image上传给namenode putFSImage(sig); // error simulation code for junit test if (ErrorSimulator.getErrorSimulation(1)) { throw new IOException("Simulating error1 " + "after uploading new image to NameNode"); } //将合并后的数据文件恢复为工作状态 //1.fsImage.ckpt重命名为fsImage,原fsImage删除 //2.edits.new重命名为edits,原edits删除 //3.打开editlog文件 namenode.rollFsImage(); //删除原有的previous.checkpoint //将lastcheckpoint.tmp更名为previous.checkpoint checkpointImage.endCheckpoint(); LOG.info("Checkpoint done. New Image Size: " + checkpointImage.getFsImageName().length()); }
6.4 Import Checkpoint恢复数据
如果主节点挂掉了,硬盘数据需要时间恢复或者不能恢复了,现在又想立刻恢复HDFS,这个时候就可以import checkpoint。步骤如下:
拿一台和原来机器一样的机器,包括配置和文件,一般来说最快的是拿你节点机器中的一台,立马能用(部分配置要改成NameNode的配置)
创建一个空的文件夹,该文件夹就是配置文件中dfs.name.dir所指向的文件夹。
拷贝你的secondary NameNode checkpoint出来的文件,到某个文件夹,该文件夹为fs.checkpoint.dir指向的文件夹
执行命令bin/hadoop namenode -importCheckpoint
这样NameNode会读取checkpoint文件,保存到dfs.name.dir。但是如果你的dfs.name.dir包含合法的 fsimage,是会执行失败的。因为NameNode会检查fs.checkpoint.dir目录下镜像的一致性,但是不会去改动它。