Hadoop 源码解析之-TextOutputFormat
因为需要自定义实现输出文件的格式,现在来分析一下TextOutputFormat的源码;
源码如下,注释会直接放在源码之中
- package org.apache.Hadoop.mapreduce.lib.output;
- import java.io.DataOutputStream;
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.compress.CompressionCodec;
- import org.apache.hadoop.io.compress.GzipCodec;
- import org.apache.hadoop.mapreduce.OutputFormat;
- import org.apache.hadoop.mapreduce.RecordWriter;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.util.*;
- /** An {@link OutputFormat} that writes plain text files. */
- public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {//TextInputFormat是默认的输出文件格式
- protected static class LineRecordWriter<K, V>//默认
- extends RecordWriter<K, V> {
- private static final String utf8 = "UTF-8";
- private static final byte[] newline;//行结束符?
- static {
- try {
- newline = "\n".getBytes(utf8);
- } catch (UnsupportedEncodingException uee) {
- throw new IllegalArgumentException("can't find " + utf8 + " encoding");
- }
- }
- protected DataOutputStream out;
- private final byte[] keyValueSeparator;//key和value的分隔符,默认的好像是Tab
- public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {//构造函数,初始化输出流及分隔符
- this.out = out;
- try {
- this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
- } catch (UnsupportedEncodingException uee) {
- throw new IllegalArgumentException("can't find " + utf8 + " encoding");
- }
- }
- public LineRecordWriter(DataOutputStream out) {//默认的分隔符
- this(out, "\t");
- }
- /**
- * Write the object to the byte stream, handling Text as a special输出流是byte格式的
- * case.
- * @param o the object to print是要输出的对象
- * @throws IOException if the write throws, we pass it on
- */
- private void writeObject(Object o) throws IOException {//应该是一行一行的写 key keyValueSeparator value \n
- if (o instanceof Text) {//如果o是Text的实例
- Text to = (Text) o;
- out.write(to.getBytes(), 0, to.getLength());//写出
- } else {
- out.write(o.toString().getBytes(utf8));
- }
- }
- public synchronized void write(K key, V value)//给写线程加锁,写是互斥行为
- throws IOException {
- <span style="white-space:pre"> </span>//下面是为了判断key和value是否为空值
- boolean nullKey = key == null || key instanceof NullWritable;//这语句太牛了
- boolean nullValue = value == null || value instanceof NullWritable;
- if (nullKey && nullValue) {//
- return;
- }
- if (!nullKey) {
- writeObject(key);
- }
- if (!(nullKey || nullValue)) {
- out.write(keyValueSeparator);
- }
- if (!nullValue) {
- writeObject(value);
- }
- out.write(newline);
- }
- public synchronized
- void close(TaskAttemptContext context) throws IOException {
- out.close();
- }
- }
- public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job//获得writer实例
- ) throws IOException, InterruptedException {
- Configuration conf = job.getConfiguration();
- boolean isCompressed = getCompressOutput(job);//
- String keyValueSeparator= conf.get("mapred.textoutputformat.separator",
- "\t");
- CompressionCodec codec = null;//压缩格式 还是?
- String extension = "";
- if (isCompressed) {
- Class<? extends CompressionCodec> codecClass =
- getOutputCompressorClass(job, GzipCodec.class);
- codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
- extension = codec.getDefaultExtension();
- }
- Path file = getDefaultWorkFile(job, extension);//这个是获取缺省的文件路径及名称,在FileOutput中有对其的实现
- FileSystem fs = file.getFileSystem(conf);
- if (!isCompressed) {
- FSDataOutputStream fileOut = fs.create(file, false);
- return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
- } else {
- FSDataOutputStream fileOut = fs.create(file, false);
- return new LineRecordWriter<K, V>(new DataOutputStream
- (codec.createOutputStream(fileOut)),
- keyValueSeparator);
- }
- }
- }
更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13
相关推荐
IT之家 2020-03-11
graseed 2020-10-28
zbkyumlei 2020-10-12
SXIAOYI 2020-09-16
jinhao 2020-09-07
impress 2020-08-26
liuqipao 2020-07-07
淡风wisdon大大 2020-06-06
yoohsummer 2020-06-01
chenjia00 2020-05-29
baike 2020-05-19
扭来不叫牛奶 2020-05-08
hxmilyy 2020-05-11
黎豆子 2020-05-07
xiongweiwei00 2020-04-29
Cypress 2020-04-25
冰蝶 2020-04-20