Hadoop0.20+ custom MultipleOutputFormat

Hadoop0.20.2中无法使用MultipleOutputFormat,多文件输出这个方法。尽管0.19.2中的方法老的方法org.apache.hadoop.mapred.lib.MultipleOutputFormat还是可以继续在0.20.2中使用,但是org.apache.hadoop.mapred下的方法都是标记为“已过时”,在hadoop下个版本中可能就不能使用了。hadoop 0.20.2中是推荐使用Configuration替换JobConf,而这个老的方法org.apache.hadoop.mapred.lib.MultipleOutputFormat中还是使用的JobConf,就是说还没有新的可替换API。

此外hadoop 0.20.2还只是一个中间版本,并不是所有API都升级到最新了,没有提供的API只能自己写。

重写MultipleOutputFormat需要2个类:

LineRecordWriter

MultipleOutputFormat

PartitionByFilenameOutputFormat是实验中需要自定义的每个文件各自输出结果

LineRecordWriter:

package cn.xmu.dm;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class LineRecordWriter<K, V> extends RecordWriter<K, V> {
	private static final String utf8 = "UTF-8";

	protected DataOutputStream out;
	private final byte[] keyValueSeparator;

	public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
		this.out = out;
		try {
			this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
		} catch (UnsupportedEncodingException uee) {
			throw new IllegalArgumentException("can't find " + utf8
					+ " encoding");
		}
	}

	public LineRecordWriter(DataOutputStream out) {
		this(out, "/t");
	}

	private void writeObject(Object o) throws IOException {
		if (o instanceof Text) {
			Text to = (Text) o;
			out.write(to.getBytes(), 0, to.getLength());
		} else {
			out.write(o.toString().getBytes(utf8));
		}
	}

	public synchronized void write(K key, V value) throws IOException {
		boolean nullKey = key == null || key instanceof NullWritable;
		boolean nullValue = value == null || value instanceof NullWritable;
		if (nullKey && nullValue) {
			return;
		}
		if (!nullKey) {
			writeObject(key);
		}
		if (!(nullKey || nullValue)) {
			out.write(keyValueSeparator);
		}
		if (!nullValue) {
			writeObject(value);
		}
		out.write("\r\n".getBytes());
	}

	public synchronized void close(TaskAttemptContext context)
			throws IOException {
		out.close();
	}
}

MultipleOutputFormat:

package cn.xmu.dm;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public abstract class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable>
		extends FileOutputFormat<K, V> {
	private MultiRecordWriter writer = null;
	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException,
			InterruptedException {
		if (writer == null) {
			writer = new MultiRecordWriter(job, getTaskOutputPath(job));
		}
		return writer;
	}
	private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException {
		Path workPath = null;
		OutputCommitter committer = super.getOutputCommitter(conf);
		if (committer instanceof FileOutputCommitter) {
			workPath = ((FileOutputCommitter) committer).getWorkPath();
		} else {
			Path outputPath = super.getOutputPath(conf);
			if (outputPath == null) {
				throw new IOException("Undefined job output-path");
			}
			workPath = outputPath;
		}
		return workPath;
	}
	
	protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf);
	public class MultiRecordWriter extends RecordWriter<K, V> {
		
		private HashMap<String, RecordWriter<K, V>> recordWriters = null;
		private TaskAttemptContext job = null;
		
		private Path workPath = null;
		public MultiRecordWriter(TaskAttemptContext job, Path workPath) {
			super();
			this.job = job;
			this.workPath = workPath;
			recordWriters = new HashMap<String, RecordWriter<K, V>>();
		}
		@Override
		public void close(TaskAttemptContext context) throws IOException, InterruptedException {
			Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator();
			while (values.hasNext()) {
				values.next().close(context);
			}
			this.recordWriters.clear();
		}
		@Override
		public void write(K key, V value) throws IOException, InterruptedException {
		
			String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration());
			RecordWriter<K, V> rw = this.recordWriters.get(baseName);
			if (rw == null) {
				rw = getBaseRecordWriter(job, baseName);
				this.recordWriters.put(baseName, rw);
			}
			rw.write(key, value);
		}
	
		private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName)
				throws IOException, InterruptedException {
			Configuration conf = job.getConfiguration();
			boolean isCompressed = getCompressOutput(job);
			String keyValueSeparator = ",";
			RecordWriter<K, V> recordWriter = null;
			if (isCompressed) {
				Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
						GzipCodec.class);
				CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
				Path file = new Path(workPath, baseName + codec.getDefaultExtension());
				FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
				recordWriter = new LineRecordWriter<K, V>(new DataOutputStream(codec
						.createOutputStream(fileOut)), keyValueSeparator);
			} else {
				Path file = new Path(workPath, baseName);
				FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);
				recordWriter = new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
			}
			return recordWriter;
		}
	}
}

PartitionByFilenameOutputFormat:

package cn.xmu.dm;



import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.io.Text;



public class PartitionByFilenameOutputFormat extends MultipleOutputFormat<Text, Text>{



	@Override

	protected String generateFileNameForKeyValue(Text key, Text value,

			Configuration conf) {

		return value.toString().substring(0, value.toString().indexOf("\t"));

	}



}

相关推荐