使用MultipleOutputFormat将数据写到不同文件中

0 引子:

目的: 将文件内容在进行map-reduce处理后,将结果写到不同的文件中(可以给不同文件重命名,取代默认生成的文件名)

1 代码:  使用老API得到如上目的(新的好像不知道怎么写...)

package outputformat;

import java.io.IOException;
import java.net.URI;
import java.util.Iterator;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Progressable;

/**
 * 老API下 wordcount写法
 * @author zm
 *
 */
public class MyMultipleOutputFormatApp {
	private static final String INPUT_PATH = "hdfs://master:9000/hello";
	private static final String OUT_PATH = "hdfs://master:9000/out";

	public static void main(String[] args) throws Exception{
		// 定义conf
		Configuration conf = new Configuration();
		final FileSystem filesystem = FileSystem.get(new URI(OUT_PATH), conf);
		if(filesystem.exists(new Path(OUT_PATH))){
			filesystem.delete(new Path(OUT_PATH), true);
		}
		// 使用老API定义job任务
		final JobConf job = new JobConf(conf , MyMultipleOutputFormatApp.class);
		job.setJarByClass(MyMultipleOutputFormatApp.class);
		// 指定输入文件路径
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		// 定义map
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		// 定义reduce
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		// 定义OutputFormat
		job.setOutputFormat(MyMultipleFilesTextOutputFormat.class);
		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
		
		JobClient.runJob(job);
	}
	
	public static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{
			@Override
			public void map(LongWritable key, Text value,
				OutputCollector<Text, LongWritable> output, Reporter reporter)
				throws IOException {
				final String line = value.toString();
				final String[] splited = line.split("\t");
				
				for (String word : splited) {
					output.collect(new Text(word), new LongWritable(1));
				}
		};
	}
	
	//map产生的<k,v>分发到reduce的过程称作shuffle
	public static class MyReducer extends MapReduceBase implements  Reducer<Text, LongWritable, Text, LongWritable>{
		@Override
		public void reduce(Text key, Iterator<LongWritable> values,
				OutputCollector<Text, LongWritable> output, Reporter reporter)
				throws IOException {
				long count = 0L;
				// 老API下 使用while进行迭代
				while(values.hasNext()) {
					LongWritable times = values.next();
					count += times.get();
				}
				output.collect(key, new LongWritable(count));
		};
	}
	
	// 此类继承MultipleOutputFormat的主要目的是----> 重写 生成输出文件文件名的方法
	// 和自定义OutputFormat的区别在于, 自定义OutputFormat 需要重写自身 并且需要重写 RecordWriter, 而继承RecordWriter仅仅需要将父类的方法进行扩展 其余工作仍交给hadoop的输出部门来做
	public static class MyMultipleFilesTextOutputFormat extends MultipleOutputFormat<Text, LongWritable>{

		// 明确定义使用哪个 recordwriter类
		@Override
		protected org.apache.hadoop.mapred.RecordWriter<Text, LongWritable> getBaseRecordWriter(
				FileSystem fs, JobConf job, String name, Progressable progress)
				throws IOException {
			final TextOutputFormat<Text, LongWritable> textOutputFormat = new TextOutputFormat<Text, LongWritable>();
			return textOutputFormat.getRecordWriter(fs, job, name, progress);
		}
		
		
		// 重写方法, 将生成输出文件文件名的方法进行重写
		@Override
		protected String generateFileNameForKeyValue(Text key,
				LongWritable value, String name) {
			//输出的文件名就是k3的值
			final String keyString = key.toString();
			if(keyString.startsWith("hello")) {
				return "hello";
			}else {
				return keyString;
			}
		}
		
	}
}

2 说明:

和自定义OutputFormat 文章相比,

 自定义OutputFormat 需要重写自身,并且需要重写 RecordWriter,在自定义OutputFormat中显示指定输出文件名

而本例中,通过继承RecordWriter仅仅需要将父类的方法generateFileNameForKeyValue进行扩展,

其余写出工作仍交给hadoop的输出部门来做。

3 输出结果如下:

[root@master hadoop]# hadoop fs -lsr /

-rw-r--r--   3 zm supergroup          8 2014-12-04 05:10 /out/hello
-rw-r--r--   3 zm supergroup          5 2014-12-04 05:10 /out/me
-rw-r--r--   3 zm supergroup          6 2014-12-04 05:10 /out/you


[root@master hadoop]# hadoop fs -text /out/me
Warning: $HADOOP_HOME is deprecated.

me      1
[root@master hadoop]# hadoop fs -text /out/you
Warning: $HADOOP_HOME is deprecated.

you     1
[root@master hadoop]# hadoop fs -text /out/hello
Warning: $HADOOP_HOME is deprecated.

hello   2

相关推荐