mapreduce 开发以及部署
前面几篇文章的梳理让我对hadoop新yarn 框架有了一个大概的认识,今天开始回归老本行---开始coding。
因为涉及到linux系统部署,所以今天安了一个linux 的 lszrz 插件
下载并解压缩 lrzsz-0.12.20.tar.gz
安装之前,需要检查系统是否有gcc 若没有请安装 yum install gcc
安装lrzsz ./configure && make && make install
上面安装过程默认把lsz和lrz安装到了/usr/local/bin/目录下, 下面创建软链接, 并命名为rz/sz:
# cd /usr/bin
# ln -s /usr/local/bin/lrz rz
# ln -s /usr/local/bin/lsz sz
开始写代码 首先导入相应的包
commons-beanutils-1.7.0.jar
commons-beanutils-core-1.8.0.jar
commons-cli-1.2.jar
commons-codec-1.4.jar
commons-collections-3.2.1.jar
commons-compress-1.4.1.jar
commons-configuration-1.6.jar
commons-digester-1.8.jar
commons-el-1.0.jar
commons-httpclient-3.1.jar
commons-io-2.4.jar
commons-lang-2.6.jar
commons-logging-1.0.4.jar
commons-logging.jar
guava-11.0.2.jar
hadoop-common-2.5.2.jar
hadoop-mapreduce-client-core-2.5.2.jar
log4j-1.2.14.jar
mockito-all-1.8.5.jar
mrunit-1.1.0-hadoop2.jar
powermock-mockito-1.4.9-full.jar
在此我们写一个分析每年最高气温的任务,气温数据格式如下
1901 01 01 06 -38 -9999 10200 270 159 8 -9999 -9999
其中1901 为年份 01 01 为月份 -38为气温
开始编写mapper 代码如下
package com.snwz.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
public class MyMapper {
private static final Log logger = LogFactory.getLog(MyMapper.class);
public static class myMapper extends Mapper<Object, Text, IntWritable, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private IntWritable key = new IntWritable();
private IntWritable record = new IntWritable();
private IntWritable year = new IntWritable();
private Context context;
/**
* key 数据偏移量
* value 数据
* context 上下文对象
*
* 注:由于要计算每年最高的气温,所以在此我们将年份作为key 气温作为value
* 都作为整形来计算
*/
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
if("".equals(line)||null==line){
return;
}
line = line.replace(" ", "%");
String array[] = line.split("%");
if(array==null || array.length<22){
logger.info("line : "+key+" array length error "+line);
return;
}
if("-9999".equals(array[5])){
logger.info("line : "+key+" temperature error -9999");
return;
}
year.set(Integer.parseInt(array[0]));
int temperature = Integer.parseInt(array[9]);
record.set(temperature);
context.write(year, record);
}
}
public static void main(String[] args) {
}
}reducer 代码如下:
package com.snwz.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer {
public static class myReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//当年最大气温
int maxTem = 0;
for(IntWritable i : values){
maxTem = Math.max(maxTem, i.get());
}
context.write(key, new IntWritable(maxTem));
}
}
}完成之后我们通过一个方便的测试工具mrunit 来进行测试
package com.snwz.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;
import com.snwz.mapreduce.MyMapper.myMapper;
import com.snwz.mapreduce.MyReducer.myReducer;
public class MpTest {
MapDriver<Object, Text, IntWritable, IntWritable> mapDriver;
ReduceDriver<IntWritable, IntWritable, IntWritable, IntWritable> reduceDriver;
MapReduceDriver<IntWritable, Text, IntWritable, IntWritable, IntWritable, IntWritable> mapReduceDriver;
@Before
public void setUp(){
System.setProperty("hadoop.home.dir", "E:\\hadoop\\hadoop-2.5.2");
myMapper mapper = new myMapper();
myReducer reducer = new myReducer();
mapDriver = MapDriver.newMapDriver(mapper);
reduceDriver = ReduceDriver.newReduceDriver(reducer);
}
@Test
public void testMapper() throws IOException {
mapDriver.withInput(new LongWritable(),
new Text("1901 01 01 06 -78 -9999 10200 270 159 8 -9999 -9999"));
mapDriver.withOutput(new IntWritable(1901), new IntWritable(-78));
mapDriver.runTest();
}
@Test
public void testReducer() throws IOException {
List<IntWritable> values = new ArrayList<IntWritable>();
values.add(new IntWritable(1));
values.add(new IntWritable(2));
values.add(new IntWritable(-48));
values.add(new IntWritable(-12));
reduceDriver.withInput(new IntWritable(1940), values)
.withOutput(new IntWritable(1940), new IntWritable(2))
.runTest();
}
}测试通过后 开始编写job 任务
package com.snwz.mapreduce;
import java.io.File;
import java.util.Date;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.snwz.mapreduce.MyMapper.myMapper;
import com.snwz.mapreduce.MyReducer.myReducer;
public class MyJob extends Configured implements Tool {
private static final Log logger = LogFactory.getLog(MyJob.class);
public static void main(String[] args) {
try {
int res;
res = ToolRunner.run(new Configuration(), new MyJob(), args);
System.exit(res);
} catch (Exception e) {
e.printStackTrace();
}
}
public int run(String[] args) throws Exception {
if (args == null || args.length != 2) {
System.out.println("need inputpath and outputpath");
return 1;
}
// hdfs 输入路径
String inputpath = args[0];
// reduce 结果集输出路径
String outputpath = args[1];
String shortin = args[0];
String shortout = args[1];
if (shortin.indexOf(File.separator) >= 0)
shortin = shortin.substring(shortin.lastIndexOf(File.separator));
if (shortout.indexOf(File.separator) >= 0)
shortout = shortout.substring(shortout.lastIndexOf(File.separator));
File inputdir = new File(inputpath);
File outputdir = new File(outputpath);
if (!inputdir.exists() || !inputdir.isDirectory()) {
System.out.println("inputpath not exist or isn't dir!");
return 0;
}
if (!outputdir.exists()) {
new File(outputpath).mkdirs();
}
Job job = new Job(new JobConf());
job.setJarByClass(MyJob.class);
job.setJobName("MyJob");
job.setOutputKeyClass(IntWritable.class);// 输出的 key 类型,在 OutputFormat 会检查
job.setOutputValueClass(IntWritable.class); // 输出的 value 类型,在 OutputFormat 会检查
job.setMapperClass(myMapper.class);
job.setCombinerClass(myReducer.class);
job.setReducerClass(myReducer.class);
FileInputFormat.setInputPaths(job, new Path(shortin));//hdfs 中的输入路径
FileOutputFormat.setOutputPath(job,new Path(shortout));//hdfs 中输出路径
Date startTime = new Date();
logger.info("Job started: " + startTime);
job.waitForCompletion(true);
Date end_time = new Date();
logger.info("Job ended: " + end_time);
logger.info("The job took " +
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
return 0;
}
}编写完成后,一个简单的mapreduce就编写完成了,然后通过打包工具将编写的类打成jar包,关联的jar就不需要了,因为hadoop里面的 jar命令会自己去关联相应的jar文件。,打包时 main 方法指定为job即可,将包存放在hadoop根目录,然后将需要分析的文件存放在hdfs系统
清空输出路径 ./bin/hadoop dfs -rmr /output
建立输入路径 ./bin/hadoop dfs -mkdir /input
上传文件 ./bin/hadoop dfs -copyFromLocal 本地路径 hdfs路径
运行jar文件 ./bin/hadoop jar myJob.jar /input /output
运行完成后 进入输出路径 查看输出结果即可。
相关推荐
通过实现MapReduce计算结果保存到MySql数据库过程,掌握多种方式保存计算结果的技术,加深了对MapReduce的理解;创建maven项目,项目名称hdfs,这里不再说明。红色部分为增加内容: