第七章:小朱笔记hadoop之源码分析-hdfs分析 第六节:SecondaryNameNode分析

第七章:小朱笔记hadoop之源码分析-hdfs分析

第六节:SecondaryNameNode分析 

6.1 架构分析

6.2 SecondaryNameNode启动过程分析

6.3 Checkpoint分析

6.4 Import Checkpoint恢复数据

6.1 架构分析

        SecondaryNameNode(snn)不是NameNode(nn)的热备进程。snn是HDFS架构中的一个组成部分,但是经常由于名字而被人误解它真正的用途,其实它真正的用途,是用来保存namenode中对HDFS metadata的信息的备份,并减少namenode重启的时间。对于hadoop进程中 ,要配置好并正确的使用 snn,还是需要做一些工作的。

       hadoop的默认配置中让snn进程默认运行在了namenode的那台机器上,但是这样的话,如果这台机器出错,宕机,对恢复HDFS文件系统是很大的灾难,更好的方式是:将snn的进程配置在另外一台机器上运行。

      在hadoop中,namenode负责对HDFS的metadata的持久化存储,并且处理来自客户端的对HDFS的各种操作的交互反馈。为了保证交互速度,HDFS文件系统的metadata是被load到namenode机器的内存中的,并且会将内存中的这些数据保存到磁盘进行持久化存储。为了保证这个持久化过程不会成为HDFS操作的瓶颈,hadoop采取的方式是:没有对任何一次的当前文件系统的snapshot进行持久化,对 HDFS最近一段时间的操作list会被保存到namenode中的一个叫Editlog的文件中去。当重启namenode时,除了load fsImage意外,还会对这个EditLog文件中记录的HDFS操作进行replay,以恢复HDFS重启之前的最终状态。

      SecondaryNameNode,会周期性的从NameNode节点上下载元数据信息(元数据镜像fsimage 和元数据库操作日志edits),然后将fsimage和edits进行合并,生成新的fsimage(该fsimage就是Secondary NameNode下载时刻的元数据的Checkpoint),在本地保存,并将其推送到NameNode,同时重置NameNode上的edits。所以namenode的重启就会Load最新的一个checkpoint,并replay EditLog中 记录的hdfs操作,由于EditLog中记录的是从上一次checkpoint以后到现在的操作列表,所以就会比较小。如果没有snn的这个周期性的合 并过程,那么当每次重启namenode的时候,就会 花费很长的时间。而这样周期性的合并就能减少重启的时间。同时也能保证HDFS系统的完整性。在namenode发生故障无法启动时,可以使用snn准备的checkpoint文件,在namenode启动时带上-importCheckpoint参数来进行恢复。

        这就是SecondaryNameNode所做的事情。所以snn并不能分担namenode上对HDFS交互性操作的压力。尽管如此,当 namenode机器宕机或者namenode进程出问题时,namenode的daemon进程可以通过人工的方式从snn上拷贝一份metadata 来恢复HDFS文件系统。

       至于为什么要将SNN进程运行在一台非NameNode的 机器上,这主要出于两点考虑:

可扩展性: 创建一个新的HDFS的snapshot需要将namenode中load到内存的metadata信息全部拷贝一遍,这样的操作需要的内存就需要 和namenode占用的内存一样,由于分配给namenode进程的内存其实是对HDFS文件系统的限制,如果分布式文件系统非常的大,那么 namenode那台机器的内存就可能会被namenode进程全部占据。

容错性: 当snn创建一个checkpoint的时候,它会将checkpoint拷贝成metadata的几个拷贝。将这个操作运行到另外一台机器,还可以提供分布式文件系统的容错性。

优点

Hadoop自带机制,成熟可靠,使用简单方便,无需开发,配置即可。

Secondaryary NameNode定期做Checkpoint,可保证各个Checkpoint阶段的元数据的可靠性,同时,进行fsimage与edits的合并,可以有效限制edits的大小,防止其无限制增长。

缺点

没有做到热备,当NameNode无法提供服务时,需要重启NameNode,服务恢复时间与文件系统规模大小成正比。

       Secondary NameNode保存的只是Checkpoint时刻的元数据,因此,一旦NameNode上的元数据损坏,通过Checkpoint恢复的元数据并不是HDFS此刻的最新数据,存在一致性问题。

       NameNode 上实现了接口NamenodeProtocol就是用亍 NameNode 和 Secondary NameNode 间的命令通信。 NameNode 和 Secondary NameNode 间数据的通信,使用的是 HTTP 协议,HTTP 的容器用的是 jetty,TransferFsImage 是文件传输的辅劣类。 

       GetImageServlet的doGet方法目前支持FSImage(getimage),日志(getedit)和存 FSImage(putimage)。

例如: 

http://×××××:50070/getimage?getimage 可以获取 FSImage。 

http://×××××:50070/getimage?getedit 可以获取日志文件。 

Secondary NameNode 发送一个 HTTP 请求到 NameNode,NameNode 上一个 HTTP 客户端到 Secondary NameNode 上去下载 FSImage。

6.2 SecondaryNameNode启动过程分析

(1)main方法

   

public static void main(String[] argv) throws Exception {
    StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG);
    Configuration tconf = new Configuration();
    if (argv.length >= 1) {
      SecondaryNameNode secondary = new SecondaryNameNode(tconf);
      int ret = secondary.processArgs(argv);
      System.exit(ret);
    }

    // Create a never ending deamon
    // 不带参数启动的时候,会启动一个不结束的进程,用于执行定时checkpoint  
    Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf)); 
    //线程方式运行SecondaryNameNode.run  
    checkpointThread.start();
  }

(2)使用nn之前建立的RpcServer(dfs.namenode.servicerpc-address),建立同namenode之间的连接进行通信

//使用nn之前建立的RpcServer(dfs.namenode.servicerpc-address),建立同namenode之间的连接进行通信
    nameNodeAddr = NameNode.getServiceAddress(conf, true);
    this.namenode =(NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class,NamenodeProtocol.versionID, nameNodeAddr, conf);

(3)初始化checkpoint的目录以及进行checkpoint的频率(fs.checkpoint.period和fs.checkpoint.size

fsName = getInfoServer();
    
    //初始化checkpoint的目录以及进行checkpoint的频率(fs.checkpoint.period和fs.checkpoint.size)
    
    
    // initialize checkpoint directories
    ////读取fs.checkpoint.dir配置项作为ckp目录,默认为/tmp/hadoop/dfs/namesecondary 
    checkpointDirs = FSImage.getCheckpointDirs(conf, "/tmp/hadoop/dfs/namesecondary");
    //读取fs.checkpoint.edits.dir配置项作为ckp edit目录,默认为/tmp/hadoop/dfs/namesecondary  
    checkpointEditsDirs = FSImage.getCheckpointEditsDirs(conf, "/tmp/hadoop/dfs/namesecondary");    
    
    //初始化checkpoint和checkpoint.edits目录,如果不存在就创造相应目录 
    checkpointImage = new CheckpointStorage();
    checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);

    //默认执行checkpoint时间间隔为1小时,edit文件大小为4M  
    // Initialize other scheduling parameters from the configuration
    checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);
    checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);

(4)启动Http服务

//启动Http服务
    try {
      infoServer = httpUGI.doAs(new PrivilegedExceptionAction<HttpServer>() {

        @Override
        public HttpServer run() throws IOException, InterruptedException {
          LOG.info("Starting web server as: " +
              UserGroupInformation.getCurrentUser().getUserName());

          int tmpInfoPort = infoSocAddr.getPort();
          infoServer = new HttpServer("secondary", infoBindAddress, tmpInfoPort,
              tmpInfoPort == 0, conf, 
              SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN));
          
          if(UserGroupInformation.isSecurityEnabled()) {
            System.setProperty("https.cipherSuites", 
                Krb5AndCertsSslSocketConnector.KRB5_CIPHER_SUITES.get(0));
            InetSocketAddress secInfoSocAddr = 
              NetUtils.createSocketAddr(infoBindAddress + ":"+ conf.get(
                "dfs.secondary.https.port", infoBindAddress + ":" + 0));
            imagePort = secInfoSocAddr.getPort();
            infoServer.addSslListener(secInfoSocAddr, conf, false, true);
          }
          
          infoServer.setAttribute("name.system.image", checkpointImage);
          infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
          
          infoServer.addInternalServlet("getimage", "/getimage",GetImageServlet.class, true);
          infoServer.start();
          return infoServer;
        }
      });
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }

         完成初始化操作后,会单独启动线程,循环执行SecondaryNameNode.run,run()调用了SecondaryNameNode.doWork()方法。doWork默认每5分钟会进行一次检查,如果editlog的大小超过checkpointSize大小或者距离上一次checkpoint时间超出checkpointPeriod时间,则执行SecondaryNameNode.doCheckpoint:

6.3 Checkpoint分析

       SecondaryNameNode节点启动之后会不断的对NameNode节点保存的元数据进行备份(定时备份),具体的说来就是:SecondaryNameNode的run方法每隔一段时间就会执行doCheckpoint()方法,SecondaryNameNode的主要工 作都在这个方法里。这个方法会从NameNode上取下FSImage和操作日志(当然也包括版本文件和fstime),然后在本地合并,然后再把合并后 的FSImage传回NameNode。这样既可以保存一个NameNode上的数据备份,又可以为NameNode节点分担一部分压力。

具体的流程如 下:

(1)调用startCheckpoint,为接下来的工作准备空间。首先存放FSImage和EditsLog的目录分别由配置文件中的fs.checkpoint.dir项和fs.checkpoint.edits.dir项来设置,然后会分别对这两类目录进行检查和恢复,对于已经存在的chechpoint要将它们设置成为lastCheckpoint;

(2) 创建RPC客户端,用于和NameNode节点通信;

(3)在SecondaryNameNode节点上开启Http服务,主要用来向NameNode节点传输合并好的元数据文件FSImage;

(4)远程调用NameNode的rollEditLog方法,让NameNode停止向edits上写操作日志,而是将新产生的日志转写到临时日志文件 edits.new上。同时,NameNode端的FSImage检查点状态要设置为ROLLED_EDITS。最后会返回一个检查点签名 CheckpointSignature;

(5)通过NameNode开启的Http服务从NameNode上下载FSImage和对应的操作日志,之后设置本地的检查点状态设置为UPLOAD_DONE;

(6)加载下载的FSImage和操作日志,从而合并成一个新的FSImage;

(7)通知NameNode新的FSImage文件已经合并好了,然后NameNode节点通过SecondaryNameNode节点的http服务来下载新的FSImage;

(8)远程调用NameNode的rollFsImage,来根据下载的最新FSImage替换原来的FSImage,临时日志文件edits.new重命名为edits;

(9)调用本地endCheckpoint方法,结束一次doCheckpoint流程。

void doCheckpoint() throws IOException {

    // Do the required initialization of the merge work area.
    // 开始Checkpoint前的初始化工作主要包括:  
    // 1.unlock所有的checkpoint目录  
    // 2.关闭checkpoint的editlog文件  
    // 3.检查checkpoint目录和checkpoint edit目录是否正常  
    // 4.腾出checkpoint目录下的current目录,原current目录更名为lastcheckpoint.tmp  
    startCheckpoint();//初始化

    // Tell the namenode to start logging transactions in a new edit file
    // Retuns a token that would be used to upload the merged image.
    
    //通知namenode开始checkpoint,拿到namenode上的checkpoint标记,打开edits.new的文件流  
    CheckpointSignature sig = (CheckpointSignature)namenode.rollEditLog();

    // error simulation code for junit test
    if (ErrorSimulator.getErrorSimulation(0)) {
      throw new IOException("Simulating error0 " +
                            "after creating edits.new");
    }

    //从namenode上下载fsimage文件与editlog文件  
    downloadCheckpointFiles(sig);   // Fetch fsimage and edits
    
    //合并fsimage与editlog文件(将image和Editlog都加载到内存合并后再savenamespace)  
    doMerge(sig);                   // Do the merge
  
    //
    // Upload the new image into the NameNode. Then tell the Namenode
    // to make this new uploaded image as the most current image.
    //
    // 将合并好的checkpoint image上传给namenode
    putFSImage(sig);

    // error simulation code for junit test
    if (ErrorSimulator.getErrorSimulation(1)) {
      throw new IOException("Simulating error1 " +
                            "after uploading new image to NameNode");
    }

    //将合并后的数据文件恢复为工作状态  
    //1.fsImage.ckpt重命名为fsImage,原fsImage删除  
    //2.edits.new重命名为edits,原edits删除  
    //3.打开editlog文件  
    namenode.rollFsImage();
    
    //删除原有的previous.checkpoint  
    //将lastcheckpoint.tmp更名为previous.checkpoint  
    checkpointImage.endCheckpoint();

    LOG.info("Checkpoint done. New Image Size: "
              + checkpointImage.getFsImageName().length());
  }

6.4 Import Checkpoint恢复数据

      如果主节点挂掉了,硬盘数据需要时间恢复或者不能恢复了,现在又想立刻恢复HDFS,这个时候就可以import checkpoint。步骤如下: 

拿一台和原来机器一样的机器,包括配置和文件,一般来说最快的是拿你节点机器中的一台,立马能用(部分配置要改成NameNode的配置) 

创建一个空的文件夹,该文件夹就是配置文件中dfs.name.dir所指向的文件夹。 

      拷贝你的secondary NameNode checkpoint出来的文件,到某个文件夹,该文件夹为fs.checkpoint.dir指向的文件夹 

      执行命令bin/hadoop namenode -importCheckpoint 

     这样NameNode会读取checkpoint文件,保存到dfs.name.dir。但是如果你的dfs.name.dir包含合法的 fsimage,是会执行失败的。因为NameNode会检查fs.checkpoint.dir目录下镜像的一致性,但是不会去改动它。 

相关推荐