hadoop combine 规约
0 简介:
a) combine发生在map流程中
b) 一般combine代码和自定义reduce代码相同,如果需要不相同,只需要继承hadoop.mapreduce.Reducer
在其内写自己代码即可。
c) combine不是hadoop标配,其有使用局限性,
不会对多个map的输出做处理,不能跨map任务执行,不能代替reduce的作用,
在进行运算时,运算结果于运算总个数有关系时,就不可以使用,eg:
求平均值计算,源文件内容为:
1,1,1,1
2,2,2
不使用combine下,reduce会接受到7个数字, 总和为 10,在除以7 结果为 10/7 = XXX
如果使用combine,在combine处 1,1,1,1 得到的平均值为1, 2,2,2的平均值为2,
然后1,2在传入reduce后在做平均值,结果为1.5
d) 而在非依赖总数计算时,combine的出现会减少map端的输出,
这样shuffle时传输的数据量减少,网络开销减少。 eg:
源文件内容如下,进行单词计数:
hello you
hello me me me
默认不使用combine下的结果为:组是3组 但是记录仍旧是6条 ,
因此在执行job操作时打印为:Map output records=6 Reduce input records=6 Combine input records=0 Combine output records=0
hello, {1,1}
you, {1}
me, {1,1,1}
增加了 combine后,结果就会变成:
因此在执行job操作时打印为:Map output records=6 Combine input records=6 Combine output records=3 Reduce input records=3 Reduce input groups=3 Reduce output records=3
hello, {2}
you, {1}
me, {3}
1 代码
package combine; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * 实现单词计数功能 * 增加自定义计数器功能 * 增加自定义规约combine,使得提高map-->reduce操作传输效率 * 测试文件 hello内容为: * hello you * hello me me me * @author zm * * 问:为什么使用Combiner? * 答:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短。 * * 问:为什么Combiner不作为MR运行的标配,而是可选步骤哪? * 答:因为不是所有的算法都适合使用Combiner处理,例如求平均数. * * 问:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作哪? * 答:combiner操作发生在map端的,处理一个任务所接收的文件中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据。 * * 输出如下: Mapper输出<hello,1> Mapper输出<you,1> Mapper输出<hello,1> Mapper输出<me,1> Mapper输出<me,1> Mapper输出<me,1> Combine输入分组<hello,...> Combine输入键值对<hello,1> Combine输入键值对<hello,1> Combiner输出键值对<hello,2> Combine输入分组<me,...> Combine输入键值对<me,1> Combine输入键值对<me,1> Combine输入键值对<me,1> Combiner输出键值对<me,3> Combine输入分组<you,...> Combine输入键值对<you,1> Combiner输出键值对<you,1> MyReducer输入分组<hello,...> MyReducer输入键值对<hello,2> MyReducer输入分组<me,...> MyReducer输入键值对<me,3> MyReducer输入分组<you,...> MyReducer输入键值对<you,1> */ public class MyWordCounterCombile { static String FILE_ROOT = "hdfs://master:9000/"; static String FILE_INPUT = "hdfs://master:9000/hello"; static String FILE_OUTPUT = "hdfs://master:9000/out"; public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf); Path outpath = new Path(FILE_OUTPUT); if(fileSystem.exists(outpath)){ fileSystem.delete(outpath, true); } // 0 定义干活的人 Job job = new Job(conf); // 1.1 告诉干活的人 输入流位置 读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数 FileInputFormat.setInputPaths(job, FILE_INPUT); // 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 job.setInputFormatClass(TextInputFormat.class); //1.2 指定自定义的map类 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //1.3 分区 job.setNumReduceTasks(1); //1.4 TODO 排序、分组 目前按照默认方式执行 //1.5 规约 job.setCombinerClass(MyCombile.class); //2.2 指定自定义reduce类 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //2.3 指定写出到哪里 FileOutputFormat.setOutputPath(job, outpath); job.setOutputFormatClass(TextOutputFormat.class); // 让干活的人干活 job.waitForCompletion(true); } } /** * 继承mapper 覆盖map方法,hadoop有自己的参数类型 * 读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数, * 这样,对于文件hello而言,调用MyMapper方法map后得到结果: * <hello,1>,<you,1>,<hello,1>,<me,1> * 方法后,得到结果为: * KEYIN, 行偏移量 * VALUEIN, 行文本内容(当前行) * KEYOUT, 行中出现的单词 * VALUEOUT 行中出现单词次数,这里固定写为1 * */ class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { Counter helloCounter = context.getCounter("Sensitive Words", "hello"); String line = v1.toString(); if(line.contains("hello")){ helloCounter.increment(1); } String[] v1s = v1.toString().split("\t"); for(String word : v1s){ context.write(new Text(word), new LongWritable(1)); System.out.println("Mapper输出<" + word +"," + 1 + ">"); // <hello,1> <hello,1> <you,1> <me,1> } } } //自定义规约器 class MyCombile extends Reducer<Text, LongWritable, Text, LongWritable>{ protected void reduce(Text k2, Iterable<LongWritable> v2s, Context ctx) throws IOException, InterruptedException { // 显示k2 在上面执行完mapper和自动合并分组后,有多少组 System.out.println("Combine输入分组<" + k2.toString() + ",...>"); long times = 0L; for(LongWritable l : v2s){ times += l.get(); System.out.println("Combine输入键值对<" + k2.toString() + "," + l.get() + ">"); } ctx.write(k2, new LongWritable(times)); System.out.println("Combiner输出键值对<"+k2.toString()+","+times+">"); } } /** * <hello,{1,1}>,<me,{1}>,<you,{1}>, 每个分组调用一次 reduce方法 * * KEYIN, 行中出现单词 * VALUEIN, 行中出现单词个数 * KEYOUT, 文件中出现不同单词 * VALUEOUT 文件中出现不同单词总个数 */ class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ protected void reduce(Text k2, Iterable<LongWritable> v2s, Context ctx) throws IOException, InterruptedException { System.out.println("MyReducer输入分组<"+k2.toString()+",...>"); long times = 0L; for(LongWritable l : v2s){ times += l.get(); System.out.println("MyReducer输入键值对<"+k2.toString()+","+l.get()+">"); } ctx.write(k2, new LongWritable(times)); } }