第七章:小朱笔记hadoop之源码分析-hdfs分析 DataNode 数据读写分析

第七章:小朱笔记hadoop之源码分析-hdfs分析

第五节:Datanode 分析

5.5 DataNode 数据读写分析

    DataNode上数据块的接受、发送并没有采用RPC机制,因为RPC是一个命令式的接口,而DataNode的数据处理是一种流式机制。
    DataXceiverServer和DataXceiver 就是这个机制的实现。其中,DataXceiver依赖于两个辅助:BlockSender和 BlockReceiver。
    DataXceviverServer:
    用于监听来自客户或其他DataNodes的请求。监听块传输连接请求,同时控制进行的块传输请求数(同一时刻的传输数不能超过maxXceiverCount)和带宽耗费情况(块传输时耗费带宽不能超过预定值BlockTransferThrottler.bytesPerPeriod)。
创建一个ServerSocket来接受请求,每接受一个连接,就创建一个DataXceiver用于处理请求,并将Socket存在一个名为childSockets的Map中。此外,还创建一个BlockBalanceThrottler对象控制数据负载均衡时流量带宽。当在run里监听到一个块传输请求时,开启一个DataXceiver线程处理块传输。系统关闭时,会关闭用于监听的连接的ServerSocket同时将DataXceiver所产生的线程关闭,使得DataXceiver因为出现错误而退出。

重要源码:

class DataXceiverServer implements Runnable, FSConstants {
  ServerSocket ss;
  DataNode datanode;
  // Record all sockets opend for data transfer
  Map<Socket, Socket> childSockets = Collections.synchronizedMap(new HashMap<Socket, Socket>());
  static final int MAX_XCEIVER_COUNT = 256;
  int maxXceiverCount = MAX_XCEIVER_COUNT;
  //默认值是256,每个node最多可以起多少个DataXceiver,如果太多的话可能会导致内存不足	
   BlockBalanceThrottler balanceThrottler;//在使用start-balancer.sh的时候,各个节点之间拷贝block的网络带宽,默认是1M/s
  long estimateBlockSize;//在这里需要block的大小,是因为需要预估本分区是否还有足够的空间 
 

 public void run() {
    while (datanode.shouldRun) {
        Socket s = ss.accept();
        s.setTcpNoDelay(true);
        //每接受一个连接,就创建一个DataXceiver用于处理请求
        new Daemon(datanode.threadGroup,new DataXceiver(s, datanode, this)).start();
}

 DataXceiver:
 DataXceiver继承自runnable,用于实现块的发送和接收。在run里进行版本和当前连接总数的校验,然后从客户端读取要进行的操作码。根据这个操作调用相应的方法进行处理。处理后关闭流,关闭socket。
    操作码:

写道
OP_WRITE_BLOCK (80):写数据块
OP_READ_BLOCK (81):读数据块
OP_REPLACE_BLOCK (83):替换一个数据块
OP_COPY_BLOCK (84):拷贝一个数据块
OP_BLOCK_CHECKSUM (85):读数据块检验码

 

首先进行版本校验,然后读取到操作命令,接着调用不同方法处理。

写道
+----------------------------------------------+
| 2 bytes version | 1 byte OP |
+----------------------------------------------+

 BlockSender:  
    构造方法:

BlockSender(Block block, long startOffset, long length,boolean corruptChecksumOk, boolean chunkOffsetOK, boolean verifyChecksum, DataNode datanode, String clientTraceFmt)

 

写道
注意参数:
block:目标块
startOffset:读取开始位置
length:读取长度
corruptChecksumOk:是否忽略读取校验信息的出错
chunkOffsetOK:是否要在发送数据之前先发送读取数据的其实位置信息
verifyChecksum:是否需要在发送数据之前先验证数据校验和

 

写道
(1) 当用户向HDFS读取某一个文件时,客户端会根据数据所在的位置转向到具体的DataNode节点请求对应数据块的数据,此时DataNode节点会用BlockSender向该客户端发送数据;
(2) 当NameNode节点发现某个Block的副本不足时,它会要求某一个存储了该Block的DataNode节点向其它DataNode节点复制该Block,当然此时仍然会采用流水线的复制方式,只不过数据来源变成了一个DataNode节点;
(3) HDFS开了一个调节DataNode负载均衡的工具Balacer,当它发现某一个DataNode节点存储的Block过多时,就会让这个DataNode节点转移一部分Blocks到新添加到集群的DataNode节点或者存储负载轻的DataNode节点上;
(4) DataNode节点在后台开启了一个用于对存储的所有Block进行扫描验证的后台线程,它会定期的利用BlockSender来检查一个Block的数据是否损坏了。DataNode节点在发送数据时不会对数据进行校验和验证,而是交给了接收端来验证数据的可靠性,这是因为即使在发送端验证正确,但经过网络传输也会发送错误,所以由接收端来验证;   DataBlockScanner在Block进行扫描验证时根本就没有接收端,必须在发送数据之前对其进行校验和的验证。

 

主要属性:

private Block block; // the block to read from //待发送的数据块  
  private InputStream blockIn; // data stream   //Block的数据读取流  
  private long blockInPosition = -1; // updated while using transferTo().
  private DataInputStream checksumIn; // checksum datastream//Block的校验和读取流  
  private DataChecksum checksum; // checksum stream //数据校验器  
  private long offset; // starting position to read//待读取的数据在Block中的开始位置  
  private long endOffset; // ending position //待读取的数据在Block中的结束位置  
  private long blockLength;//Block的大小  
  private int bytesPerChecksum; // chunk size //数据校验块的大小  
  private int checksumSize; // checksum size //数据校验块对应的校验和大小  
  private boolean corruptChecksumOk; // if need to verify checksum //是否要进行校验和
  private boolean chunkOffsetOK; // if need to send chunk offset //是否要在发送数据之前先发送读取数据的其实位置信息
  private long seqno; // sequence number of packet //数据包的编号  

  private boolean transferToAllowed = true;
  private boolean blockReadFully; //set when the whole block is read
  private boolean verifyChecksum; //if true, check is verified while reading //是否需要在发送数据之前先验证数据校验和
  private BlockTransferThrottler throttler;//流量控制器
  private final String clientTraceFmt; // format of client trace log message
  private final MemoizedBlock memoizedBlock;

 

构造初始化:

创建Block对应的数据和校验数据的读取流;创建校验器;根据校验器确定待读取数据的开始位置和结束位置。

//定位到待去读数据的开始位置  
      blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
      memoizedBlock = new MemoizedBlock(blockIn, blockLength, datanode.data, block);

  特别注意:
   真正需要读取数据的开始位置和结束位置可能并不恰好包含完整的若干个数据校验块,而是开始位置和结束位置都位于数据校验块的中间,为了客户端能够利用校验和来验证数据的有效性就需要多传输一些数据。

//根据校验器调整读取的开始位置和结束位置  往前退到完整的数据校验块 多读一些数据
      offset = (startOffset - (startOffset % bytesPerChecksum));
      if (length >= 0) {
        // Make sure endOffset points to end of a checksumed chunk.
        long tmpLen = startOffset + length;
        if (tmpLen % bytesPerChecksum != 0) {
          tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
        }
        if (tmpLen < endOffset) {
          endOffset = tmpLen; //往后退到完整的数据校验块 多读一些数据
        }
      }

 

发送数据:

先发送checksum.header给请求者,然后将块文件划分成最多maxChunksPerPacket个数据包进行发送,每个数据包大小为pktSize,然后多次调研sendChunks将数据包发送出去。发送数据包结束后发送一个0表示块的发送完成。然后BlockReader.close()关闭各种资源。在发送包的过程中会统计已发送的流量。

sendBlock

long sendBlock(DataOutputStream out, OutputStream baseStream, 
                 BlockTransferThrottler throttler) throws IOException {
    if( out == null ) {
      throw new IOException( "out stream is null" );
    }
    this.throttler = throttler;

    long initialOffset = offset;
    long totalRead = 0;
    OutputStream streamForSendChunks = out;
    
    final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; 
    try {
      try {
        ////发送校验器信息  
        checksum.writeHeader(out);
        if ( chunkOffsetOK ) {
          out.writeLong( offset );
        }
        out.flush();
      } catch (IOException e) { //socket error
        throw ioeToSocketException(e);
      }
      
      int maxChunksPerPacket;
      int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
      
      if (transferToAllowed && !verifyChecksum && 
          baseStream instanceof SocketOutputStream && 
          blockIn instanceof FileInputStream) {
        
        FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
        
        // blockInPosition also indicates sendChunks() uses transferTo.
        blockInPosition = fileChannel.position();
        streamForSendChunks = baseStream;
        
        // //计算一个数据包最多包含多少个数据校验快块  
        // assure a mininum buffer size.
        maxChunksPerPacket = (Math.max(BUFFER_SIZE,
                                       MIN_BUFFER_WITH_TRANSFERTO)
                              + bytesPerChecksum - 1)/bytesPerChecksum;
        
        // packet buffer has to be able to do a normal transfer in the case
        // of recomputing checksum
        // //计算一个数据包的大小   
        pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
      } else {
        // //计算一个数据包最多包含多少个数据检验块  
        maxChunksPerPacket = Math.max(1,
                 (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
        // //计算一个数据包的大小  
        pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
      }

      ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);

      while (endOffset > offset) {
        long len = sendChunks(pktBuf, maxChunksPerPacket, streamForSendChunks);
        offset += len;
        totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
                            checksumSize);
        seqno++;
      }
      try {
        //发送数据包结束后发送一个0表示块的发送完成
        out.writeInt(0); // mark the end of block        
        out.flush();
      } catch (IOException e) { //socket error
        throw ioeToSocketException(e);
      }
    }
    catch (RuntimeException e) {
      LOG.error("unexpected exception sending block", e);
      
      throw new IOException("unexpected runtime exception", e);
    } 
    finally {
      if (clientTraceFmt != null) {
        final long endTime = System.nanoTime();
        ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime));
      }
      close();
    }

    blockReadFully = (initialOffset == 0 && offset >= blockLength);

    return totalRead;
  }

 

sendChunks:
计算一个数据包能发送的最多个chunks,然后将packetLen数据包长度、offset数据包开始位置在块中的偏移、seqno数据包的编号、是不是最后一个数据包写到pkt,然后将块元数据文件中的校验数据读出到pkt中。计算出块数据在pkt中的位置(存放在校验数据之后)。然后根据BlockReader.blockInPosition变量值判断是否可以用FileChannel将数据发给块请求者。如果不可以,就将块数据读取到buf中,然后利用checksum对读出来的数据生成校验和,并与先前读出的块元数据文件进行校验,校验成功后将buf发送给请求者。如果可以,就先将buf中的校验和部分发送出去,然后再利用fileChannel将块文件中的数据发送出去。最后调用BlockTransferThrottler.throttle进行节流控制。

写道
packetLen:数据包长度;
offset:数据包中的数据在Block中的开始位置;
seqno:数据包的编号;
endFlag:是否没有数据包标志(0/1);
len:数据包中数据的长度;
chunksum:一个校验和;
datachunk:一个校验数据块;

 

//计算数据包的长度 
    int len = Math.min((int) (endOffset - offset), bytesPerChecksum * maxChunks);
	 //数据包头部信息写入缓存 
    // write packet header
    pkt.putInt(packetLen);
    pkt.putLong(offset);
    pkt.putLong(seqno);
    pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
               //why no ByteBuf.putBoolean()?
    pkt.putInt(len);

      //发送数据包
      //first write the packet
      sockOut.write(buf, 0, dataOff);
      // no need to flush. since we know out is not a buffered stream.
      sockOut.transferToFully(fileChannel, blockInPosition, len);

	//调整发送速度  
    if (throttler != null) { // rebalancing so throttle
      throttler.throttle(packetLen);
    }

 

..........................正在写