第一个Map-Reduce程序

1. 开发环境

=========

Hadoop:

Hadoop 1.1.2
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.1 -r 1440782
Compiled by hortonfo on Thu Jan 31 02:08:44 UTC 2013
From source with checksum c720ddcf4b926991de7467d253a79b8b

java:

java version "1.6.0_27"
OpenJDK Runtime Environment (IcedTea6 1.12.5) (6b27-1.12.5-0Ubuntu0.12.04.1)
OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)

os:

Distributor ID: Ubuntu
Description: Ubuntu 12.04.2 LTS
Release: 12.04
Codename: precise

eclipse:

Eclipse Platform

Version: 3.7.2
Build id: I20110613-1736

2. 数据准备

=========

要做map-reduce,得先准备点数据。

在这个例子中,我产生了1亿条,1000个学生的考试成绩数据,学生的学号是从1 - 1000,考试成绩的范围是0 - 100

用php搞个小脚本来搞定

<?php
 $i = 0;
 $nRows = 10000000;
 $fileData = fopen("Score.data", "a+");
 if(!$fileData)
  die("Can't open file!\n");
 for(; $i < $nRows; $i++)
 {
  $nNumber = rand(1, 1000);
  $nScore = rand(0, 100);
  $strLine = $nNumber."\t".$nScore."\n";
  fputs($fileData, $strLine);
 }
 fclose($fileData);
?>

3. 配置eclipse

=========

用hadoop搞map-reduce自然要用java,用java自然要用eclipse

eclipse是有hadoop map-reduce插件的,但是那个插件兼容的版本已经太老了,因此如果你用了新版的eclipse,还得自己配一下hadoop的包。

hadoop的包在ubuntu上安装在

/usr/share/hadoop

该路径下有一系列jar包(包括lib的子目录里等),导入到工程中,其中一般有用的是hadoop-core-1.1.2.jar这个包,基本上都要导入

相关阅读:

4. 写个mapper

=========

mapper在我们求平均分的例子中是将每一行数据拆分成<键:学号,值:分数>的对以交给reducer使用

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;


public class CAvgScoreMapper implements Mapper<LongWritable, Text, Text, IntWritable>
{
 //Mapper<LongWritable, Text, Text, IntWritable>是映射器接口,模板参数:
 //LongWritable是输入给映射器的Key(一般是文件的偏移量)
 //Text是输入给映射器的键值对中的值,一般是每一行文件的内容,
 //Text是输出给归约器的键, 在此学号我们认为是字符串数据
 //IntWritable是输出给归约器的值,在此是成绩,因此是整数
 @Override
 public void configure(JobConf objJobConfig)
 {
  //这个函数将作业启动时的job配置对象传入
 
 }

 @Override
 public void close() throws IOException
 {return;
  //目前没啥用处,不用管
 
 }

 @Override
 public void map(LongWritable lKey, Text txtLine, OutputCollector<Text, IntWritable> ReducerInput, Reporter objReporter)
   throws IOException
 {
  //这是映射器的映射函数,在此将数据的每一行拆成学号:成绩的对
  try
  {
   String strLine = txtLine.toString(); //这个函数中txtLine的输入就恰是每一行数据.
   if(strLine.length() < 3) return; //正常数据至少有3个字符:学号一个字符,制表符一个字符,分数一个字符
   StringTokenizer objToken = new StringTokenizer(strLine, "\t");
   String strPair[] = new String[2];
   int i = 0;
   for(; (i < 2) && objToken.hasMoreTokens(); i++) //分解每一行的数据,构成键值对
   {
     strPair[i] = objToken.nextToken();
   }
   if(i < 2) return; //行数据分解的有问题
   int nScore = Integer.parseInt(strPair[1]);
   ReducerInput.collect(new Text(strPair[0]), new IntWritable(nScore));
  }
  catch(Exception expError)
  {
   return;
  }
 }
}

相关推荐