hadoop reduce端join ---> 打标记

0 引子:

读取两个文件:

hello:

1,zhangsan
2,lisi
3,wangwu

hello1:

1,45
2,56
3,89

最后实现如下输出:

zhangsan,45
lisi,56
wangwu,89

0.1) 从两个文件中得到数据,在map端根据文件名做记录,后在reduce上实现输出, 因为数据在不同文件中,因此必须也只能在reduce端做join操作,在join之前需要依赖map端做的针对文件来源做标记

1 代入如下,主要看自定义map和reduce的写法

package join;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 *
 */
public class MapJoinApp {

	static String FILE_ROOT = "hdfs://master:9000/";
	static String FILE_INPUT = "hdfs://master:9000/files";
	static String FILE_OUTPUT = "hdfs://master:9000/out";
	public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
		
		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf);
		Path outpath = new Path(FILE_OUTPUT);
		if(fileSystem.exists(outpath)){
			fileSystem.delete(outpath, true);
		}
		
		// 0 定义干活的人
		Job job = new Job(conf);
		// 1.1 告诉干活的人 输入流位置     读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数
		FileInputFormat.setInputPaths(job, FILE_INPUT);
		// 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2 指定自定义的map类
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(Text.class);
		
		//1.3 分区
		job.setNumReduceTasks(1);
		
		//1.4 TODO 排序、分组    目前按照默认方式执行
		//1.5 TODO 规约
		
		//2.2 指定自定义reduce类
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		//2.3 指定写出到哪里
		FileOutputFormat.setOutputPath(job, outpath);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		// 让干活的人干活s
		job.waitForCompletion(true);
		
	}
	
}

/**
 *
 */
class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
	String line = "";
	@Override
	protected void map(LongWritable k1, Text v1, Context context)
			throws IOException, InterruptedException {
		FileSplit split = (FileSplit)context.getInputSplit();
		String filename = split.getPath().getName(); // hello or  hello1 文件名
		//String pathStr = split.getPath().toString();// hdfs://master:9000/files/hello or hdfs://master:9000/files/hello1
		System.out.println(filename);
		line = v1.toString();// 逐行执行 最后一行就是文件的最后一样内容
		String[] v1s = v1.toString().split(",");
		String v2Str = "";
		if("hello".equals(filename)){ // hello文件内容格式为: 1	zhangsan
			v2Str = "#" + v1s[1];
			System.out.println("hello : " +  v2Str);
		}
		if("hello1".equals(filename)){ // hello1文件内容格式为: 1	45
			v2Str = "*" + v1s[1];
			System.out.println("hello1 : " +  v2Str);
		}
		//for(String word : v1s){
			context.write(new LongWritable(Long.parseLong(v1s[0])), new Text(v2Str));
		//}
	}
	
	
}

/**
 */
class MyReducer extends Reducer<LongWritable, Text, Text, Text>{

	protected void reduce(LongWritable k2, Iterable<Text> v2s, Context ctx)
			throws IOException, InterruptedException {
		System.out.println("reduce ...");
		
		String k3Str = "";
		String v3Str = "";
		
		for(Text v2 : v2s){
			//System.out.println("k2: " + k2.get() + " v2: " + l.toString());
			if(v2.toString().startsWith("#")){
				k3Str = v2.toString().substring(1, v2.toString().length());
			}
			if(v2.toString().startsWith("*")){
				v3Str = v2.toString().substring(1, v2.toString().length());
			}
		}
		
		ctx.write(new Text(k3Str), new Text(v3Str));
	}
	
}

2 结果:

[root@master local]# hadoop fs -text /out/part-r-00000
Warning: $HADOOP_HOME is deprecated.

zhangsan        45
lisi    56
wangwu  89