第一个Map-Reduce程序
1. 开发环境
=========
Hadoop:
Hadoop 1.1.2
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.1 -r 1440782
Compiled by hortonfo on Thu Jan 31 02:08:44 UTC 2013
From source with checksum c720ddcf4b926991de7467d253a79b8b
java:
java version "1.6.0_27"
OpenJDK Runtime Environment (IcedTea6 1.12.5) (6b27-1.12.5-0Ubuntu0.12.04.1)
OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)
os:
Distributor ID: Ubuntu
Description: Ubuntu 12.04.2 LTS
Release: 12.04
Codename: precise
eclipse:
Eclipse Platform
Version: 3.7.2
Build id: I20110613-1736
2. 数据准备
=========
要做map-reduce,得先准备点数据。
在这个例子中,我产生了1亿条,1000个学生的考试成绩数据,学生的学号是从1 - 1000,考试成绩的范围是0 - 100
用php搞个小脚本来搞定
<?php
$i = 0;
$nRows = 10000000;
$fileData = fopen("Score.data", "a+");
if(!$fileData)
die("Can't open file!\n");
for(; $i < $nRows; $i++)
{
$nNumber = rand(1, 1000);
$nScore = rand(0, 100);
$strLine = $nNumber."\t".$nScore."\n";
fputs($fileData, $strLine);
}
fclose($fileData);
?>
3. 配置eclipse
=========
用hadoop搞map-reduce自然要用java,用java自然要用eclipse
eclipse是有hadoop map-reduce插件的,但是那个插件兼容的版本已经太老了,因此如果你用了新版的eclipse,还得自己配一下hadoop的包。
hadoop的包在ubuntu上安装在
/usr/share/hadoop
该路径下有一系列jar包(包括lib的子目录里等),导入到工程中,其中一般有用的是hadoop-core-1.1.2.jar这个包,基本上都要导入
相关阅读:
4. 写个mapper
=========
mapper在我们求平均分的例子中是将每一行数据拆分成<键:学号,值:分数>的对以交给reducer使用
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class CAvgScoreMapper implements Mapper<LongWritable, Text, Text, IntWritable>
{
//Mapper<LongWritable, Text, Text, IntWritable>是映射器接口,模板参数:
//LongWritable是输入给映射器的Key(一般是文件的偏移量)
//Text是输入给映射器的键值对中的值,一般是每一行文件的内容,
//Text是输出给归约器的键, 在此学号我们认为是字符串数据
//IntWritable是输出给归约器的值,在此是成绩,因此是整数
@Override
public void configure(JobConf objJobConfig)
{
//这个函数将作业启动时的job配置对象传入
}
@Override
public void close() throws IOException
{return;
//目前没啥用处,不用管
}
@Override
public void map(LongWritable lKey, Text txtLine, OutputCollector<Text, IntWritable> ReducerInput, Reporter objReporter)
throws IOException
{
//这是映射器的映射函数,在此将数据的每一行拆成学号:成绩的对
try
{
String strLine = txtLine.toString(); //这个函数中txtLine的输入就恰是每一行数据.
if(strLine.length() < 3) return; //正常数据至少有3个字符:学号一个字符,制表符一个字符,分数一个字符
StringTokenizer objToken = new StringTokenizer(strLine, "\t");
String strPair[] = new String[2];
int i = 0;
for(; (i < 2) && objToken.hasMoreTokens(); i++) //分解每一行的数据,构成键值对
{
strPair[i] = objToken.nextToken();
}
if(i < 2) return; //行数据分解的有问题
int nScore = Integer.parseInt(strPair[1]);
ReducerInput.collect(new Text(strPair[0]), new IntWritable(nScore));
}
catch(Exception expError)
{
return;
}
}
}