使用MultipleOutputFormat将数据写到不同文件中
0 引子:
目的: 将文件内容在进行map-reduce处理后,将结果写到不同的文件中(可以给不同文件重命名,取代默认生成的文件名)
1 代码: 使用老API得到如上目的(新的好像不知道怎么写...)
package outputformat; import java.io.IOException; import java.net.URI; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.MultipleOutputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Progressable; /** * 老API下 wordcount写法 * @author zm * */ public class MyMultipleOutputFormatApp { private static final String INPUT_PATH = "hdfs://master:9000/hello"; private static final String OUT_PATH = "hdfs://master:9000/out"; public static void main(String[] args) throws Exception{ // 定义conf Configuration conf = new Configuration(); final FileSystem filesystem = FileSystem.get(new URI(OUT_PATH), conf); if(filesystem.exists(new Path(OUT_PATH))){ filesystem.delete(new Path(OUT_PATH), true); } // 使用老API定义job任务 final JobConf job = new JobConf(conf , MyMultipleOutputFormatApp.class); job.setJarByClass(MyMultipleOutputFormatApp.class); // 指定输入文件路径 FileInputFormat.setInputPaths(job, INPUT_PATH); // 定义map job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 定义reduce job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 定义OutputFormat job.setOutputFormat(MyMultipleFilesTextOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); JobClient.runJob(job); } public static class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable>{ @Override public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { final String line = value.toString(); final String[] splited = line.split("\t"); for (String word : splited) { output.collect(new Text(word), new LongWritable(1)); } }; } //map产生的<k,v>分发到reduce的过程称作shuffle public static class MyReducer extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable>{ @Override public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { long count = 0L; // 老API下 使用while进行迭代 while(values.hasNext()) { LongWritable times = values.next(); count += times.get(); } output.collect(key, new LongWritable(count)); }; } // 此类继承MultipleOutputFormat的主要目的是----> 重写 生成输出文件文件名的方法 // 和自定义OutputFormat的区别在于, 自定义OutputFormat 需要重写自身 并且需要重写 RecordWriter, 而继承RecordWriter仅仅需要将父类的方法进行扩展 其余工作仍交给hadoop的输出部门来做 public static class MyMultipleFilesTextOutputFormat extends MultipleOutputFormat<Text, LongWritable>{ // 明确定义使用哪个 recordwriter类 @Override protected org.apache.hadoop.mapred.RecordWriter<Text, LongWritable> getBaseRecordWriter( FileSystem fs, JobConf job, String name, Progressable progress) throws IOException { final TextOutputFormat<Text, LongWritable> textOutputFormat = new TextOutputFormat<Text, LongWritable>(); return textOutputFormat.getRecordWriter(fs, job, name, progress); } // 重写方法, 将生成输出文件文件名的方法进行重写 @Override protected String generateFileNameForKeyValue(Text key, LongWritable value, String name) { //输出的文件名就是k3的值 final String keyString = key.toString(); if(keyString.startsWith("hello")) { return "hello"; }else { return keyString; } } } }
2 说明:
和自定义OutputFormat 文章相比,
自定义OutputFormat 需要重写自身,并且需要重写 RecordWriter,在自定义OutputFormat中显示指定输出文件名
而本例中,通过继承RecordWriter仅仅需要将父类的方法generateFileNameForKeyValue进行扩展,
其余写出工作仍交给hadoop的输出部门来做。
3 输出结果如下:
[root@master hadoop]# hadoop fs -lsr / -rw-r--r-- 3 zm supergroup 8 2014-12-04 05:10 /out/hello -rw-r--r-- 3 zm supergroup 5 2014-12-04 05:10 /out/me -rw-r--r-- 3 zm supergroup 6 2014-12-04 05:10 /out/you [root@master hadoop]# hadoop fs -text /out/me Warning: $HADOOP_HOME is deprecated. me 1 [root@master hadoop]# hadoop fs -text /out/you Warning: $HADOOP_HOME is deprecated. you 1 [root@master hadoop]# hadoop fs -text /out/hello Warning: $HADOOP_HOME is deprecated. hello 2
相关推荐
minerd 2020-10-28
Kafka 2020-09-18
Wepe0 2020-10-30
杜倩 2020-10-29
windle 2020-10-29
mengzuchao 2020-10-22
Junzizhiai 2020-10-10
bxqybxqy 2020-09-30
风之沙城 2020-09-24
kingszelda 2020-09-22
大唐帝国前营 2020-08-18
yixu0 2020-08-17
TangCuYu 2020-08-15
xiaoboliu00 2020-08-15
songshijiazuaa 2020-08-15
xclxcl 2020-08-03
zmzmmf 2020-08-03
newfarhui 2020-08-03
likesyour 2020-08-01