第七章:小朱笔记hadoop之源码分析-hdfs分析 第四节:namenode-ReplicationMonitor

第四节:namenode分析

4.3 namenode 副本监控分析ReplicationMonitor

ReplicationMonitor主要有两个作用:

(1)负责为副本不足的数据块选择source 数据节点,选择冗余的target节点,等待DN节点下次心跳将这些工作带回给相应的DN执行块冗余操作。
(2)将各个数据节点上无效的数据块副本加入无效集合,等待下次心跳将这些工作带回给相应的DataNode执行删除无效块操作。
默认每3s执行一次,可以通过修改dfs.replication.interval来调整执行间隔
(1)computeDatanodeWork
    计算datanode需要处理的replication数量,主要包括当前超时挂起的replication,需要进行复制的replication,计 划处理的replication,损坏的replication。将这些数据记录下来,在下次heartbeat时候通知给datanode处理。
(2)processPendingReplications
    处理超时挂起的replication,将超时的replication加入到需要进行replication操作的队列中
    重要的控制参数:

//限制每一次处理无效Blocks的数据节点数量  
static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;   
//限制每一次处理冗余Blocks的数据节点数量  
static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
重要的数据结构:
(1)UnderReplicatedBlocks neededReplications
     需要进行复制的数据块。UnderReplicatedBlocks是一个数组,数组的下标是优先级(0的优先级最高,如果数据块只有一个副本,它的优先 级是0), 数组的内容是一个Block集合。UnderReplicatedBlocks提供一些方法,对Block进行增加,修改,查找和删除。类 UnderReplicatedBlocks的一个实例,负责存储需要冗余副本的块集合, 副本可能被冗余1份或多份;在数据块报告的时候,检查到块副本数量不够,就会将数据块加入该实例存储;在ReplicationMonitor中,将 block及其target加入replicateBlocks队列后,将该块从neededReplications中删除;

(2)PendingReplicationBlocks pendingReplications

     描述当前尚未完成块副本复制的块的列表。因为HDFS支持块的流水线复制,当文件系统客户端开始向第一个Datanode复制数据块的时候,会在第一个 Datanode上启动复制进程,将该结点上接收到的(部分),数据块开始向第二个Datanode上传输复制,而第二个Datanode结点又向第三个 Datanode结点进行流水线复制,……,直到满足副本因子要求。所以,在执行流水线复制的过程中,因为这种可并行的复制方式使得某些块副本的复制完成 时间呈现阶梯状,也就是使用一个数据结构来保存这些尚未复制完成的块副本pendingReplications保存了所有正在进行复制的数据块,使用 Map是需要一些附加的信息PendingBlockInfo。这些信息包括时间戳, 用于检测是否已经超时,和现在进行复制的数目numReplicasInProgress。timedOutItems是超时的复制项,超时的复制项在 FSNamesystem的processPendingReplications方法中被删除,并从新复制。timerThread是用于检测复制超时 的线程的句柄,对应的线程是PendingReplicationMonitor的一个实例,它的run方法每隔一段会检查是否有超时的复制项,如果有, 将该数据块加到timedOutItems中。Timeout是run方法的检查间隔,defaultRecheckInterval是缺省值。在 ReplicationMonitor中,又会将timedOutItems中的数据块重新加入neededReplications;

我们分两个方面一一分析:
(1)块冗余处理机制
          块冗余处理机制分为四个步骤:选择需要冗余的数据块、为数据块选择源节点、为数据块选择目标节点、将块和目标节点加入待冗余的队列。下面一一分析:

  第一步:获取一定量需要冗余的数据块
    当DN节点报告块的时候,NN会判断块的冗余是否足够,如果不足可能有几种情况:副本可能1份或者2份,如果副本仅一份,则需要冗余的优先级就相对更高, 则会将该块放入优先级更高的队列,副本已经有2份的块则会放入优先级更低点的队列,编号越低,一共分为3个优先级。
UnderReplicatedBlocks 主要的成员是一个优先队列,List<TreeSet<Block>> priorityQueues,保存副本数没有达到期望值的block。根据当前副本数低于期望值的程度决定优先级,差的越远,优先级越大(0~2,0最 大)。每个优先级别对应一个TreeSet,getPriority获得优先级后决定放入哪个treeset中。
      对于冗余工作,根据DN节点数量不同,每次处理的数据块也会不同,比如当前集群有10个DN 节点,则默认情况下,ReplicationMonitor一个周期只会从neededReplications中取出20个数据块进行处理,之所以 HDFS要这样设计,主要是考虑到如果一次冗余的数据块太多,势必会对DN节点的网络造成比较大的影响,因此根据不同的DN集群规模决定一次冗余的数据块 数量。在具体冗余的过程中,首先按照优先级从优先级队列中拿一部分数据(如前面提及的20)块到replicateBlocks中,并且每次记录下读取的 优先级队列的偏移量,下次从该偏移指针处真正读取块信息。

      以上工作均在 chooseUnderReplicatedBlocks中分四步完成:

synchronized List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {  
        // initialize data structure for the return value  
        //1 初始化三级数组  
        List<List<Block>> blocksToReplicate =  new ArrayList<List<Block>>(UnderReplicatedBlocks.LEVEL);  
        for (int i=0; i<UnderReplicatedBlocks.LEVEL; i++) {  
          blocksToReplicate.add(new ArrayList<Block>());  
        }  
          
        synchronized(neededReplications) {  
          if (neededReplications.size() == 0) {  
            missingBlocksInCurIter = 0;  
            missingBlocksInPrevIter = 0;  
            return blocksToReplicate;  
          }  
            
          //2.跳过已经处理的数据 replIndex为游标  
          // Go through all blocks that need replications.  
          BlockIterator neededReplicationsIterator = neededReplications.iterator();  
          // skip to the first unprocessed block, which is at replIndex   
          for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {  
            neededReplicationsIterator.next();  
          }  
          // # of blocks to process equals either twice the number of live   
          // data-nodes or the number of under-replicated blocks whichever is less  
          //要取的block 和 neededReplications 数量 取较小值  
          blocksToProcess = Math.min(blocksToProcess, neededReplications.size());  
      
          //3.迭代获取需要冗余的Block,根据优先级放入不同的数据  
          for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {  
            if( ! neededReplicationsIterator.hasNext()) {  
              // start from the beginning  
              replIndex = 0;  
              missingBlocksInPrevIter = missingBlocksInCurIter;  
              missingBlocksInCurIter = 0;  
              blocksToProcess = Math.min(blocksToProcess, neededReplications.size());  
              if(blkCnt >= blocksToProcess)  
                break;  
              neededReplicationsIterator = neededReplications.iterator();  
              assert neededReplicationsIterator.hasNext() :   
                                      "neededReplications should not be empty.";  
            }  
      
            Block block = neededReplicationsIterator.next();  
            int priority = neededReplicationsIterator.getPriority();  
            if (priority < 0 || priority >= blocksToReplicate.size()) {  
              LOG.warn("Unexpected replication priority: " + priority + " " + block);  
            } else {  
              blocksToReplicate.get(priority).add(block);  
            }  
          } // end for  
        } // end synchronized  
        return blocksToReplicate;  
     }

第二步:选择源节点

     

private DatanodeDescriptor chooseSourceDatanode(Block block,List<DatanodeDescriptor> containingNodes,NumberReplicas numReplicas);  
      
            依赖的数据结构  
        (a)块(Block)到其元数据的映射表,元数据信息包括块所属的inode、存储块的Datanode  
        BlocksMap blocksMap   
      
        (b)保存损坏(如:校验没通过)的数据块到对应DataNode的关系   
          CorruptReplicasMap corruptReplicas   
      
            (c)保存Datanode上有效但需要删除的数据块(StorageID -> TreeSet<Block>)   
        Map<String, Collection<Block>> excessReplicateMap

 主要逻辑:
    (a)判断在某DataNode上副本是否损坏,如果损坏,遍历下个DataNode节点;
    (b)判断在某DataNode对于的replicateBlocks中,是否已经有大于等于2个数据块了,如果是这样,遍历下一个DataNode节点,之所以这样实现,防止单DataNode流量过高;
    (c)如果某DataNode节点已经退役,遍历下一个DataNode节点;
    (d)如果某DataNode节点正在退役,将该DataNode作为source,而这也是HDFS设计的目的,这样的节点不会让出现因为读写数据块而 网络拥 塞,因此也不会让网络那么忙,当然即使是正在退役的节点,replicateBlocks中的块数量也不能超过2个(防止网络拥塞);如果有多个节点正在 退役,会选择最后一个退役节点作为该块的source节点;
    (e)如果没有正在退役的节点,随意选择一个DN节点作为source节点;

第三步:选择目标节点       目前存储了该块副本的DN节点不会再作为target节点,至于选择target节点的数目,根据需要再冗余的块副本数量而定,比如还需要冗余2个副 本,则会选择2个target节点,选择的方式,依然采用机架感知相关算法。该算法我会专门用一节来详细讲述。

第四步:将块和目标节点加入待冗余的队列

srcNode.addBlockToBeReplicated(block, targets);
注意:DatanodeDescriptor中的BlockQueue replicateBlocks = new BlockQueue();

(2)块无效处理机制

  这种类型数据块对于集群没有作用,如果不删除,会占用磁盘空间。在几种情况下会产生无效块:
(a)执行删除文件的时候,会首先快速地删除元数据,文件所对应的数据块就变得无效;
(b)Client上传数据块到DataNode的时候,可能由于网络原因等,数据块上传一部分后失败,这些块成了无效块;
(c) 在HDFS做rebalance操作的时候,会将负载较重的DataNode上部分块文件复制到其他负载较轻的DataNode上,数据块复制成功后,负 载较轻的DataNode上的该数据块就成了无效块;处理的无效块数量也是有限制的:数量为活着的dn的32%, 即 nodesToProcess = (int)Math.ceil((double)heartbeats.size() * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100); 在该流程中,也并不是无效块集合中有多少块,NameNode就一次性让DataNode把相应的无效块删除完,而是每次均删除一部分,这样的话不会造成 DataNode因为删除块而带宽,CPU资源等占用过大。
依赖数据结构: Map<String, Collection<Block>> recentInvalidateSets ,保存了每个DataNode上有效,但需要删除的数据块(StorageID >> TreeSet<Block>)。


流程分析:
(a)周期性调用FSNameSystem的computeDatanodeWork方法
(b)computeDatanodeWork调用computeInvalidateWork方法
(c)循环调用invalidateWorkForOneNode方法,直到达到预设值或全部处理完
(d)invalidateWorkForOneNode方法中,从recentInvalidateSets结构中获得第一个datanode节点,找到 对应的DatanodeDescriptro,同时将其所有对应invalidate block遍历出
(e)将遍历出的invalidate block封装调用addBlocksToBeInvalidated方法,在下一次心跳汇报时返回给datanode进行相应处理