Hadoop 源码解析之-TextOutputFormat

因为需要自定义实现输出文件的格式,现在来分析一下TextOutputFormat的源码;

源码如下,注释会直接放在源码之中

  1. package org.apache.Hadoop.mapreduce.lib.output;  
  2.   
  3. import java.io.DataOutputStream;  
  4. import java.io.IOException;  
  5. import java.io.UnsupportedEncodingException;  
  6.   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.FileSystem;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.fs.FSDataOutputStream;  
  11.   
  12. import org.apache.hadoop.io.NullWritable;  
  13. import org.apache.hadoop.io.Text;  
  14. import org.apache.hadoop.io.compress.CompressionCodec;  
  15. import org.apache.hadoop.io.compress.GzipCodec;  
  16. import org.apache.hadoop.mapreduce.OutputFormat;  
  17. import org.apache.hadoop.mapreduce.RecordWriter;  
  18. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  19. import org.apache.hadoop.util.*;  
  20.   
  21. /** An {@link OutputFormat} that writes plain text files. */  
  22. public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {//TextInputFormat是默认的输出文件格式   
  23.   protected static class LineRecordWriter<K, V>//默认   
  24.     extends RecordWriter<K, V> {  
  25.     private static final String utf8 = "UTF-8";  
  26.     private static final byte[] newline;//行结束符?   
  27.     static {  
  28.       try {  
  29.         newline = "\n".getBytes(utf8);  
  30.       } catch (UnsupportedEncodingException uee) {  
  31.         throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
  32.       }  
  33.     }  
  34.   
  35.     protected DataOutputStream out;  
  36.     private final byte[] keyValueSeparator;//key和value的分隔符,默认的好像是Tab   
  37.   
  38.     public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {//构造函数,初始化输出流及分隔符    
  39.       this.out = out;  
  40.       try {  
  41.         this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  
  42.       } catch (UnsupportedEncodingException uee) {  
  43.         throw new IllegalArgumentException("can't find " + utf8 + " encoding");  
  44.       }  
  45.     }  
  46.   
  47.     public LineRecordWriter(DataOutputStream out) {//默认的分隔符   
  48.       this(out, "\t");  
  49.     }  
  50.   
  51.     /** 
  52.      * Write the object to the byte stream, handling Text as a special输出流是byte格式的 
  53.      * case. 
  54.      * @param o the object to print是要输出的对象 
  55.      * @throws IOException if the write throws, we pass it on 
  56.      */  
  57.     private void writeObject(Object o) throws IOException {//应该是一行一行的写 key keyValueSeparator value \n   
  58.       if (o instanceof Text) {//如果o是Text的实例   
  59.         Text to = (Text) o;  
  60.         out.write(to.getBytes(), 0, to.getLength());//写出   
  61.       } else {  
  62.         out.write(o.toString().getBytes(utf8));  
  63.       }  
  64.     }  
  65.   
  66.     public synchronized void write(K key, V value)//给写线程加锁,写是互斥行为   
  67.       throws IOException {  
  68. <span style="white-space:pre">    </span>//下面是为了判断key和value是否为空值   
  69.       boolean nullKey = key == null || key instanceof NullWritable;//这语句太牛了   
  70.       boolean nullValue = value == null || value instanceof NullWritable;  
  71.       if (nullKey && nullValue) {//   
  72.         return;  
  73.       }  
  74.       if (!nullKey) {  
  75.         writeObject(key);  
  76.       }  
  77.       if (!(nullKey || nullValue)) {  
  78.         out.write(keyValueSeparator);  
  79.       }  
  80.       if (!nullValue) {  
  81.         writeObject(value);  
  82.       }  
  83.       out.write(newline);  
  84.     }  
  85.   
  86.     public synchronized   
  87.     void close(TaskAttemptContext context) throws IOException {  
  88.       out.close();  
  89.     }  
  90.   }  
  91.   
  92.   public RecordWriter<K, V>    getRecordWriter(TaskAttemptContext job//获得writer实例   
  93.                          ) throws IOException, InterruptedException {  
  94.     Configuration conf = job.getConfiguration();  
  95.     boolean isCompressed = getCompressOutput(job);//   
  96.     String keyValueSeparator= conf.get("mapred.textoutputformat.separator",  
  97.                                        "\t");  
  98.     CompressionCodec codec = null;//压缩格式 还是?   
  99.     String extension = "";  
  100.     if (isCompressed) {  
  101.       Class<? extends CompressionCodec> codecClass =   
  102.         getOutputCompressorClass(job, GzipCodec.class);  
  103.       codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);  
  104.       extension = codec.getDefaultExtension();  
  105.     }  
  106.     Path file = getDefaultWorkFile(job, extension);//这个是获取缺省的文件路径及名称,在FileOutput中有对其的实现   
  107.     FileSystem fs = file.getFileSystem(conf);  
  108.     if (!isCompressed) {  
  109.       FSDataOutputStream fileOut = fs.create(file, false);  
  110.       return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);  
  111.     } else {  
  112.       FSDataOutputStream fileOut = fs.create(file, false);  
  113.       return new LineRecordWriter<K, V>(new DataOutputStream  
  114.                                         (codec.createOutputStream(fileOut)),  
  115.                                         keyValueSeparator);  
  116.     }  
  117.   }  
  118. }  
更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

相关推荐