MapReduce实现矩阵乘法--实现代码

为了让大家更直观的了解程序执行,今天编写了实现代码供大家参考。

编程环境:

  • java version "1.7.0_40"
  • Eclipse Kepler
  • Windows7 x64
  • Ubuntu 12.04 LTS
  • Hadoop2.2.0
  • Vmware 9.0.0 build-812388 

输入数据:

A矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA/matrixa

A矩阵内容:
3 4 6
4 0 8

 B矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB/matrixb

B矩阵内容:
2 3
3 0
4 1

实现代码:

一共三个类:

  • 驱动类MMDriver
  • Map类MMMapper
  • Reduce类MMReducer

大家可根据个人习惯合并成一个类使用。

MMDriver.java

package dataguru.matrixmultiply;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MMDriver {
 
 public static void main(String[] args) throws Exception {
 
  // set configuration
  Configuration conf = new Configuration();


  // create job
  Job job = new Job(conf,"MatrixMultiply");
  job.setJarByClass(dataguru.matrixmultiply.MMDriver.class);
 
        //  specify Mapper & Reducer
  job.setMapperClass(dataguru.matrixmultiply.MMMapper.class);
  job.setReducerClass(dataguru.matrixmultiply.MMReducer.class);
 
  // specify output types of mapper and reducer
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);
 
  // specify input and output DIRECTORIES
  Path inPathA = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA");
  Path inPathB = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB");
  Path outPath = new Path("hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixC");
  FileInputFormat.addInputPath(job, inPathA);
  FileInputFormat.addInputPath(job, inPathB);
        FileOutputFormat.setOutputPath(job,outPath);


  // delete output directory
  try{
   FileSystem hdfs = outPath.getFileSystem(conf);
   if(hdfs.exists(outPath))
    hdfs.delete(outPath);
   hdfs.close();
  } catch (Exception e){
   e.printStackTrace();
   return ;
  }
 
  //  run the job
  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

MMMapper.java

package dataguru.matrixmultiply;


import java.io.IOException;
import java.util.StringTokenizer;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;


public class MMMapper extends Mapper<Object, Text, Text, Text> {
 private String tag;  //current matrix
 
    private int crow = 2;// 矩阵A的行数
    private int ccol = 2;// 矩阵B的列数
   
    private static int arow = 0; //current arow
    private static int brow = 0; //current brow
 
 @Override
 protected void setup(Context context) throws IOException,
   InterruptedException {
  // TODO get inputpath of input data, set to tag
  FileSplit fs = (FileSplit)context.getInputSplit();
  tag = fs.getPath().getParent().getName();
 }


 /**
  * input data include two matrix files
  */
 public void map(Object key, Text value, Context context)
   throws IOException, InterruptedException {
  StringTokenizer str = new StringTokenizer(value.toString());
 
  if ("matrixA".equals(tag)) {          //left matrix,output key:x,y
   int col = 0;
   while (str.hasMoreTokens()) {
    String item = str.nextToken();  //current x,y = line,col
    for (int i = 0; i < ccol; i++) {
     Text outkey = new Text(arow+","+i);
     Text outvalue = new Text("a,"+col+","+item);
     context.write(outkey, outvalue);
     System.out.println(outkey+" | "+outvalue);
    }
    col++;
   }
   arow++;
   
  }else if ("matrixB".equals(tag)) {
   int col = 0;
   while (str.hasMoreTokens()) {
    String item = str.nextToken();  //current x,y = line,col
    for (int i = 0; i < crow; i++) {
     Text outkey = new Text(i+","+col);
     Text outvalue = new Text("b,"+brow+","+item);
     context.write(outkey, outvalue);
     System.out.println(outkey+" | "+outvalue);
    }
    col++;
   }
   brow++;
   
  }
 }
}

MMReducer.java

package dataguru.matrixmultiply;


import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;


public class MMReducer extends Reducer<Text, Text, Text, Text> {


 public void reduce(Text key, Iterable<Text> values, Context context)
   throws IOException, InterruptedException {


  Map<String,String> matrixa = new HashMap<String,String>();
  Map<String,String> matrixb = new HashMap<String,String>();
 
  for (Text val : values) {  //values example : b,0,2  or  a,0,4
   StringTokenizer str = new StringTokenizer(val.toString(),",");
   String sourceMatrix = str.nextToken();
   if ("a".equals(sourceMatrix)) {
    matrixa.put(str.nextToken(), str.nextToken());  //(0,4)
   }
   if ("b".equals(sourceMatrix)) {
    matrixb.put(str.nextToken(), str.nextToken());  //(0,2)
   }
  }
 
  int result = 0;
  Iterator<String> iter = matrixa.keySet().iterator();
  while (iter.hasNext()) {
   String mapkey = iter.next();
   result += Integer.parseInt(matrixa.get(mapkey)) * Integer.parseInt(matrixb.get(mapkey));
  }


  context.write(key, new Text(String.valueOf(result)));
 }
}

相关推荐