MapReduce案例之单表关联
1 单表关联
1.1 单表关联
"单表关联"这个实例要求从给出的数据中寻找所关心的数据,它是对原始数据所包含信息的挖掘。
1.2 应用场景
实例中给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表。
1.3 设计思路
分析这个实例,显然需要进行单表连接,连接的是左表的parent列和右表的child列,且左表和右表是同一个表。
连接结果中除去连接的两列就是所需要的结果——"grandchild--grandparent"表。要用MapReduce解决这个实例,首先应该考虑如何实现表的自连接;其次就是连接列的设置;最后是结果的整理。
考虑到MapReduce的shuffle过程会将相同的key会连接在一起,所以可以将map结果的key设置成待连接的列,然后列中相同的值就自然会连接在一起了。
要连接的是左表的parent列和右表的child列,且左表和右表是同一个表,所以在map阶段将读入数据分割成child和parent之后,会将parent设置成key,child设置成value进行输出,并作为左表;再将同一对child和parent中的child设置成key,parent设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个key的value-list就包含了"grandchild--grandparent"关系。取出每个key的value-list进行解析,将左表中的child放入一个数组,右表中的parent放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了。
1.4 程序代码
程序代码如下所示。
import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class STjoin { public static int time = 0; /* * map将输出分割child和parent,然后反序输出一次作为左表, * 正序输出一次作为右表,需要注意的是在输出的value中必须 * 加上左右表的区别标识。 */ public static class Map extends Mapper<Object, Text, Text, Text> { // 实现map函数 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String childname = new String();// 孩子名称 String parentname = new String();// 父母名称 String relationtype = new String();// 左右表标识 // 输入的一行预处理文本 StringTokenizer itr=new StringTokenizer(value.toString()); String[] values=new String[2]; int i=0; while(itr.hasMoreTokens()){ values[i]=itr.nextToken(); i++; } if (values[0].compareTo("child") != 0) { childname = values[0]; parentname = values[1]; // 输出左表 relationtype = "1"; context.write(new Text(values[1]), new Text(relationtype + "+"+ childname + "+" + parentname)); // 输出右表 relationtype = "2"; context.write(new Text(values[0]), new Text(relationtype + "+"+ childname + "+" + parentname)); } } } public static class Reduce extends Reducer<Text, Text, Text, Text> { // 实现reduce函数 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 输出表头 if (0 == time) { context.write(new Text("grandchild"), new Text("grandparent")); time++; } int grandchildnum = 0; String[] grandchild = new String[10]; int grandparentnum = 0; String[] grandparent = new String[10]; Iterator ite = values.iterator(); while (ite.hasNext()) { String record = ite.next().toString(); int len = record.length(); int i = 2; if (0 == len) { continue; } // 取得左右表标识 char relationtype = record.charAt(0); // 定义孩子和父母变量 String childname = new String(); String parentname = new String(); // 获取value-list中value的child while (record.charAt(i) != '+') { childname += record.charAt(i); i++; } i = i + 1; // 获取value-list中value的parent while (i < len) { parentname += record.charAt(i); i++; } // 左表,取出child放入grandchildren if ('1' == relationtype) { grandchild[grandchildnum] = childname; grandchildnum++; } // 右表,取出parent放入grandparent if ('2' == relationtype) { grandparent[grandparentnum] = parentname; grandparentnum++; } } // grandchild和grandparent数组求笛卡尔儿积 if (0 != grandchildnum && 0 != grandparentnum) { for (int m = 0; m < grandchildnum; m++) { for (int n = 0; n < grandparentnum; n++) { // 输出结果 context.write(new Text(grandchild[m]), new Text(grandparent[n])); } } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "192.168.1.2:9001"); String[] ioArgs = new String[] { "STjoin_in", "STjoin_out" }; String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: Single Table Join <in> <out>"); System.exit(2); } Job job = new Job(conf, "Single Table Join"); job.setJarByClass(STjoin.class); // 设置Map和Reduce处理类 job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); // 设置输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |