hadoop reduce端join ---> 打标记
0 引子:
读取两个文件:
hello:
1,zhangsan
2,lisi
3,wangwu
hello1:
1,45
2,56
3,89
最后实现如下输出:
zhangsan,45
lisi,56
wangwu,89
0.1) 从两个文件中得到数据,在map端根据文件名做记录,后在reduce上实现输出, 因为数据在不同文件中,因此必须也只能在reduce端做join操作,在join之前需要依赖map端做的针对文件来源做标记
1 代入如下,主要看自定义map和reduce的写法
package join; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * */ public class MapJoinApp { static String FILE_ROOT = "hdfs://master:9000/"; static String FILE_INPUT = "hdfs://master:9000/files"; static String FILE_OUTPUT = "hdfs://master:9000/out"; public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(FILE_ROOT),conf); Path outpath = new Path(FILE_OUTPUT); if(fileSystem.exists(outpath)){ fileSystem.delete(outpath, true); } // 0 定义干活的人 Job job = new Job(conf); // 1.1 告诉干活的人 输入流位置 读取hdfs中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数 FileInputFormat.setInputPaths(job, FILE_INPUT); // 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 job.setInputFormatClass(TextInputFormat.class); //1.2 指定自定义的map类 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); //1.3 分区 job.setNumReduceTasks(1); //1.4 TODO 排序、分组 目前按照默认方式执行 //1.5 TODO 规约 //2.2 指定自定义reduce类 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //2.3 指定写出到哪里 FileOutputFormat.setOutputPath(job, outpath); job.setOutputFormatClass(TextOutputFormat.class); // 让干活的人干活s job.waitForCompletion(true); } } /** * */ class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ String line = ""; @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit)context.getInputSplit(); String filename = split.getPath().getName(); // hello or hello1 文件名 //String pathStr = split.getPath().toString();// hdfs://master:9000/files/hello or hdfs://master:9000/files/hello1 System.out.println(filename); line = v1.toString();// 逐行执行 最后一行就是文件的最后一样内容 String[] v1s = v1.toString().split(","); String v2Str = ""; if("hello".equals(filename)){ // hello文件内容格式为: 1 zhangsan v2Str = "#" + v1s[1]; System.out.println("hello : " + v2Str); } if("hello1".equals(filename)){ // hello1文件内容格式为: 1 45 v2Str = "*" + v1s[1]; System.out.println("hello1 : " + v2Str); } //for(String word : v1s){ context.write(new LongWritable(Long.parseLong(v1s[0])), new Text(v2Str)); //} } } /** */ class MyReducer extends Reducer<LongWritable, Text, Text, Text>{ protected void reduce(LongWritable k2, Iterable<Text> v2s, Context ctx) throws IOException, InterruptedException { System.out.println("reduce ..."); String k3Str = ""; String v3Str = ""; for(Text v2 : v2s){ //System.out.println("k2: " + k2.get() + " v2: " + l.toString()); if(v2.toString().startsWith("#")){ k3Str = v2.toString().substring(1, v2.toString().length()); } if(v2.toString().startsWith("*")){ v3Str = v2.toString().substring(1, v2.toString().length()); } } ctx.write(new Text(k3Str), new Text(v3Str)); } }
2 结果:
[root@master local]# hadoop fs -text /out/part-r-00000 Warning: $HADOOP_HOME is deprecated. zhangsan 45 lisi 56 wangwu 89