hadoop combine 规约

0 简介:

a) combine发生在map流程中

b) 一般combine代码和自定义reduce代码相同,如果需要不相同,只需要继承hadoop.mapreduce.Reducer

    在其内写自己代码即可。

c) combine不是hadoop标配,其有使用局限性,

    不会对多个map的输出做处理,不能跨map任务执行,不能代替reduce的作用,

    在进行运算时,运算结果于运算总个数有关系时,就不可以使用,eg:

    求平均值计算,源文件内容为:

    1,1,1,1
    2,2,2

不使用combine下,reduce会接受到7个数字, 总和为 10,在除以7 结果为 10/7 = XXX
如果使用combine,在combine处 1,1,1,1 得到的平均值为1, 2,2,2的平均值为2, 

 然后1,2在传入reduce后在做平均值,结果为1.5

d)  而在非依赖总数计算时,combine的出现会减少map端的输出,

   这样shuffle时传输的数据量减少,网络开销减少。 eg:

  源文件内容如下,进行单词计数:

   hello you
   hello me me me

默认不使用combine下的结果为:组是3组 但是记录仍旧是6条 ,
因此在执行job操作时打印为:Map output records=6  Reduce input records=6  Combine input records=0   Combine output records=0
hello, {1,1}
you, {1}
me, {1,1,1}

增加了 combine后,结果就会变成:
因此在执行job操作时打印为:Map output records=6  Combine input records=6   Combine output records=3   Reduce input records=3 Reduce input groups=3  Reduce output records=3

hello, {2}
you, {1}
me, {3}

1 代码

package combine;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 实现单词计数功能
 * 增加自定义计数器功能
 * 增加自定义规约combine,使得提高map-->reduce操作传输效率
 * 测试文件 hello内容为:
 * hello	you
 * hello	me	me	me
 * @author zm
 *
 * 问:为什么使用Combiner?
 * 答:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短。
 * 
 * 问:为什么Combiner不作为MR运行的标配,而是可选步骤哪?
 * 答:因为不是所有的算法都适合使用Combiner处理,例如求平均数.
 *
 * 问:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作哪?
 * 答:combiner操作发生在map端的,处理一个任务所接收的文件中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据。
 * 
 * 输出如下:
Mapper输出<hello,1>
Mapper输出<you,1>
Mapper输出<hello,1>
Mapper输出<me,1>
Mapper输出<me,1>
Mapper输出<me,1>
Combine输入分组<hello,...>
Combine输入键值对<hello,1>
Combine输入键值对<hello,1>
Combiner输出键值对<hello,2>
Combine输入分组<me,...>
Combine输入键值对<me,1>
Combine输入键值对<me,1>
Combine输入键值对<me,1>
Combiner输出键值对<me,3>
Combine输入分组<you,...>
Combine输入键值对<you,1>
Combiner输出键值对<you,1>

MyReducer输入分组<hello,...>
MyReducer输入键值对<hello,2>
MyReducer输入分组<me,...>
MyReducer输入键值对<me,3>
MyReducer输入分组<you,...>
MyReducer输入键值对<you,1>
 */
public class MyWordCounterCombile {

	static String FILE_ROOT = "hdfs://master:9000/";
	static String FILE_INPUT = "hdfs://master:9000/hello";
	static String FILE_OUTPUT = "hdfs://master:9000/out";
	public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
		
		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf);
		Path outpath = new Path(FILE_OUTPUT);
		if(fileSystem.exists(outpath)){
			fileSystem.delete(outpath, true);
		}
		
		// 0 定义干活的人
		Job job = new Job(conf);
		// 1.1 告诉干活的人 输入流位置     读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数
		FileInputFormat.setInputPaths(job, FILE_INPUT);
		// 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2 指定自定义的map类
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		//1.3 分区
		job.setNumReduceTasks(1);
		
		//1.4 TODO 排序、分组    目前按照默认方式执行
		//1.5  规约
		job.setCombinerClass(MyCombile.class);
		
		//2.2 指定自定义reduce类
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		
		//2.3 指定写出到哪里
		FileOutputFormat.setOutputPath(job, outpath);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		// 让干活的人干活
		job.waitForCompletion(true);
		
	}
	
}

/**
 * 继承mapper 覆盖map方法,hadoop有自己的参数类型
 * 读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数,
 * 这样,对于文件hello而言,调用MyMapper方法map后得到结果:
 * <hello,1>,<you,1>,<hello,1>,<me,1>
 * 方法后,得到结果为: 
 * KEYIN,      行偏移量
 * VALUEIN,    行文本内容(当前行)
 * KEYOUT,     行中出现的单词
 * VALUEOUT    行中出现单词次数,这里固定写为1
 *
 */
class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

	@Override
	protected void map(LongWritable k1, Text v1, Context context)
			throws IOException, InterruptedException {
		
		Counter helloCounter = context.getCounter("Sensitive Words", "hello");
		String line = v1.toString();
		if(line.contains("hello")){
			helloCounter.increment(1);
		}
		
		String[] v1s = v1.toString().split("\t");
		for(String word : v1s){
			context.write(new Text(word), new LongWritable(1));
			System.out.println("Mapper输出<" + word +"," + 1 + ">"); // <hello,1>  <hello,1> <you,1>  <me,1>
		}
	}
}

//自定义规约器
class MyCombile extends Reducer<Text, LongWritable, Text, LongWritable>{
	
	protected void reduce(Text k2, Iterable<LongWritable> v2s, Context ctx) throws IOException, InterruptedException {
		// 显示k2 在上面执行完mapper和自动合并分组后,有多少组
		System.out.println("Combine输入分组<" + k2.toString() + ",...>");
		long times = 0L;
		for(LongWritable l : v2s){
			times += l.get();
			System.out.println("Combine输入键值对<" + k2.toString() + "," + l.get() + ">");
		}
		ctx.write(k2, new LongWritable(times));
		System.out.println("Combiner输出键值对<"+k2.toString()+","+times+">");
	}
	
}

/**
 * <hello,{1,1}>,<me,{1}>,<you,{1}>, 每个分组调用一次 reduce方法
 * 
 * KEYIN,     行中出现单词
 * VALUEIN,   行中出现单词个数
 * KEYOUT,    文件中出现不同单词
 * VALUEOUT   文件中出现不同单词总个数
 */
class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

	protected void reduce(Text k2, Iterable<LongWritable> v2s, Context ctx)
			throws IOException, InterruptedException {
		System.out.println("MyReducer输入分组<"+k2.toString()+",...>");
		long times = 0L;
		for(LongWritable l : v2s){
			times += l.get();
			System.out.println("MyReducer输入键值对<"+k2.toString()+","+l.get()+">");
		}
		ctx.write(k2, new LongWritable(times));
	}
	
}