hadoop自定义outputformat源码

hadoop outputformat是reduceTask中的重要过程

1.实例化outputformat,检查输出目录合法性

在jobClient的submitJobInternal反射生成的outputformat

// Check the output specification
          if (reduces == 0 ? jobCopy.getUseNewMapper() : 
            jobCopy.getUseNewReducer()) {
            org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
              ReflectionUtils.newInstance(context.getOutputFormatClass(),
                  jobCopy);//生成outputformat
            output.checkOutputSpecs(context);
          } else {
            jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
          }

 贴上一个最常用的FileOutputFormat的checkOutputSpaces的方法

// Ensure that the output directory is set and not already there
    Path outDir = getOutputPath(job);//获得mapred.output.dir的目录
    if (outDir == null) {
      throw new InvalidJobConfException("Output directory not set.");
    }
    // get delegation token for outDir's file system
    TokenCache.obtainTokensForNamenodes(job.getCredentials(), 
                                        new Path[] {outDir}, 
                                        job.getConfiguration());
    if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {//获得当前job的fs,判断目录是否存在
      throw new FileAlreadyExistsException("Output directory " + outDir + 
                                           " already exists");
    }

写出key和value 

1.生成inputformat和recordwritter

Task中的initialize方法,创建outputformat,并生成committer,这样mapper和reducer都会执行

主要在ReducerTask中使用outputformat,在runNewReducer方法中,获取recordWritrer

// make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
    // make a reducer
    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
       new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(reduceOutputCounter,
         job, reporter, taskContext);//NewTrackingRecordWriter一样也是recordWriter的代理类
    job.setBoolean("mapred.skip.on", isSkipping());

2.写出key和value

在自定义Reducer运行run方法中,调用reducer进行业务处理

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKey()) {
      reduce(context.getCurrentKey(), context.getValues(), context);//执行reduce
    }
    cleanup(context);
  }

 在reducer的reduce方法,使用Reducer$Context调用自定义recordWriter的代理类

Reducer$Context代码:

/**
   * Generate an output key/value pair.
   */
  public void write(KEYOUT key, VALUEOUT value
                    ) throws IOException, InterruptedException {
    output.write(key, value);
  }

 NewTrackingRecordWriter代码:

@Override
    public void write(K key, V value) throws IOException, InterruptedException {
      long bytesOutPrev = getOutputBytes(fsStats);
      real.write(key,value);
      long bytesOutCurr = getOutputBytes(fsStats);
      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
      outputRecordCounter.increment(1);
    }

 最终在reducerTask中关闭writter

reducer.run(reducerContext);
trackedRW.close(reducerContext);

相关推荐