HBASE 代码阅读笔记-1 - PUT-3-提交任务2(基于0.96-hadoop2)

看看MultiServerCallable的核心方法,call

public MultiResponse call() throws IOException {
    int countOfActions = this.multiAction.size();
    if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
    MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
    List<CellScannable> cells = null;
    // The multi object is a list of Actions by region.  Iterate by region.
    for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
      final byte [] regionName = e.getKey();
      final List<Action<R>> actions = e.getValue();
      RegionAction.Builder regionActionBuilder;
      if (this.cellBlock) {//判断是否需要以cellblock的方式发送消息,具体不明备注【0】。
        // Presize.  Presume at least a KV per Action.  There are likely more.
        if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
        // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
        // They have already been handled above. Guess at count of cells
        regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells);//备注【1】
      } else {
        regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions);//备注【2】
      }
      multiRequestBuilder.addRegionAction(regionActionBuilder.build());//备注【3】
    }
    // Controller optionally carries cell data over the proxy/service boundary and also
    // optionally ferries cell response data back out again.
    PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
    controller.setPriority(getTableName());
    ClientProtos.MultiResponse responseProto;
    ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
    try {
      responseProto = getStub().multi(controller, requestProto);
    } catch (ServiceException e) {
      return createAllFailedResponse(requestProto, ProtobufUtil.getRemoteException(e));
    }
    return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
  }
public static <R> RegionAction.Builder buildRegionAction(final byte[] regionName,
      final List<Action<R>> actions)
  throws IOException {
    RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
    for (Action<R> action: actions) {
      Row row = action.getAction();
      ClientProtos.Action.Builder actionBuilder =
          ClientProtos.Action.newBuilder().setIndex(action.getOriginalIndex());
      if (row instanceof Get) {
        Get g = (Get)row;
        builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
      } else if (row instanceof Put) {
        builder.addAction(actionBuilder.
          setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row)));
      } else if (row instanceof Delete) {
        builder.addAction(actionBuilder.
          setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row)));
      } else if (row instanceof Append) {
        builder.addAction(actionBuilder.
          setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row)));
      } else if (row instanceof Increment) {
        builder.addAction(actionBuilder.
          setMutation(ProtobufUtil.toMutation((Increment)row)));
      } else if (row instanceof RowMutations) {
        throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow");
      } else {
        throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
      }
    }
    return builder;
  }

 private static RegionAction.Builder getRegionActionBuilderWithRegion(final byte [] regionName) {
    RegionAction.Builder builder = RegionAction.newBuilder();
    RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
    builder.setRegion(region);
    return builder;
  }

相关推荐