mapreduce 开发以及部署

前面几篇文章的梳理让我对hadoop新yarn 框架有了一个大概的认识,今天开始回归老本行---开始coding。

因为涉及到linux系统部署,所以今天安了一个linux 的 lszrz 插件

下载并解压缩 lrzsz-0.12.20.tar.gz

安装之前,需要检查系统是否有gcc  若没有请安装 yum install gcc

安装lrzsz  ./configure && make && make install
上面安装过程默认把lsz和lrz安装到了/usr/local/bin/目录下, 下面创建软链接, 并命名为rz/sz:
# cd /usr/bin
# ln -s /usr/local/bin/lrz rz
# ln -s /usr/local/bin/lsz sz

开始写代码 首先导入相应的包
  commons-beanutils-1.7.0.jar
 commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.4.jar
 commons-collections-3.2.1.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-digester-1.8.jar
 commons-el-1.0.jar
 commons-httpclient-3.1.jar
 commons-io-2.4.jar
 commons-lang-2.6.jar
 commons-logging-1.0.4.jar
 commons-logging.jar
 guava-11.0.2.jar
 hadoop-common-2.5.2.jar
 hadoop-mapreduce-client-core-2.5.2.jar
 log4j-1.2.14.jar
 mockito-all-1.8.5.jar
 mrunit-1.1.0-hadoop2.jar
 powermock-mockito-1.4.9-full.jar

在此我们写一个分析每年最高气温的任务,气温数据格式如下

1901 01 01 06   -38 -9999 10200   270   159     8 -9999 -9999

其中1901 为年份 01 01 为月份 -38为气温

开始编写mapper 代码如下

package com.snwz.mapreduce;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class MyMapper {

	private static final Log logger = LogFactory.getLog(MyMapper.class);
	public static class myMapper extends Mapper<Object, Text, IntWritable, IntWritable> {
		
		private static final IntWritable one = new IntWritable(1);
		private IntWritable key = new IntWritable(); 
		private IntWritable record = new IntWritable();
		private IntWritable year = new IntWritable(); 
		private Context context; 
		/**
		 * key 数据偏移量
		 * value 数据
		 * context 上下文对象
		 * 
		 * 注:由于要计算每年最高的气温,所以在此我们将年份作为key 气温作为value
		 * 都作为整形来计算
		 */
	    @Override
		protected void map(Object key, Text value, Context context)
	    		throws IOException, InterruptedException {
			String line = value.toString();
	    	if("".equals(line)||null==line){
	    		return;
	    	}
	    	line = line.replace(" ", "%");
	    	String array[] = line.split("%");
	    	if(array==null || array.length<22){
	    		logger.info("line : "+key+" array length error "+line);
	    		return;
	    	}
	    	if("-9999".equals(array[5])){
	    		logger.info("line : "+key+" temperature error -9999");
	    		return;
	    	}
	    	year.set(Integer.parseInt(array[0]));
	    	int temperature = Integer.parseInt(array[9]);
	    	record.set(temperature);
	    	context.write(year, record); 
	    }
	}
	
	
	public static void main(String[] args) {
		
	}
	
}

reducer 代码如下:

package com.snwz.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer {
	
	public static class myReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
		@Override
		protected void reduce(IntWritable key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			//当年最大气温
			int maxTem = 0;
			for(IntWritable i : values){
				maxTem = Math.max(maxTem, i.get());
			}
			context.write(key, new IntWritable(maxTem));
		}
	}

}

 完成之后我们通过一个方便的测试工具mrunit 来进行测试

package com.snwz.mapreduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

import com.snwz.mapreduce.MyMapper.myMapper;
import com.snwz.mapreduce.MyReducer.myReducer;

public class MpTest {

	MapDriver<Object, Text, IntWritable, IntWritable> mapDriver;  
	ReduceDriver<IntWritable, IntWritable, IntWritable, IntWritable> reduceDriver;  
	MapReduceDriver<IntWritable, Text, IntWritable, IntWritable, IntWritable, IntWritable> mapReduceDriver;
	
	
	@Before
	public void setUp(){
		System.setProperty("hadoop.home.dir", "E:\\hadoop\\hadoop-2.5.2");
		myMapper  mapper = new myMapper();
		myReducer reducer = new myReducer();
		mapDriver = MapDriver.newMapDriver(mapper);
		reduceDriver = ReduceDriver.newReduceDriver(reducer);
	}
	
	 @Test  
	 public void testMapper() throws IOException {
		 mapDriver.withInput(new LongWritable(), 
				 new Text("1901 01 01 06   -78 -9999 10200   270   159     8 -9999 -9999"));    
		 mapDriver.withOutput(new IntWritable(1901), new IntWritable(-78));    
		 mapDriver.runTest();
     }

     @Test
     public void testReducer() throws IOException {
    	 List<IntWritable> values = new ArrayList<IntWritable>();
    	 values.add(new IntWritable(1));
    	 values.add(new IntWritable(2));
    	 values.add(new IntWritable(-48));
    	 values.add(new IntWritable(-12));
    	 reduceDriver.withInput(new IntWritable(1940), values)
    	 .withOutput(new IntWritable(1940), new IntWritable(2))
    	 .runTest();
     }
}

 测试通过后 开始编写job 任务

package com.snwz.mapreduce;

import java.io.File;
import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.snwz.mapreduce.MyMapper.myMapper;
import com.snwz.mapreduce.MyReducer.myReducer;

public class MyJob extends Configured implements Tool {
	
	private static final Log logger = LogFactory.getLog(MyJob.class);

	public static void main(String[] args) {
		try {
			int res;
			res = ToolRunner.run(new Configuration(), new MyJob(), args);
			System.exit(res);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public int run(String[] args) throws Exception {
		if (args == null || args.length != 2) {
			System.out.println("need inputpath and outputpath");
			return 1;
		}
		// hdfs 输入路径
		String inputpath = args[0];
		// reduce 结果集输出路径
		String outputpath = args[1];
		
		String shortin = args[0]; 
		String shortout = args[1]; 
		if (shortin.indexOf(File.separator) >= 0) 
		shortin = shortin.substring(shortin.lastIndexOf(File.separator)); 
		if (shortout.indexOf(File.separator) >= 0) 
		shortout = shortout.substring(shortout.lastIndexOf(File.separator)); 

		File inputdir = new File(inputpath);
		File outputdir = new File(outputpath);
		if (!inputdir.exists() || !inputdir.isDirectory()) {
			System.out.println("inputpath not exist or isn't dir!");
			return 0;
		}
		if (!outputdir.exists()) {
			new File(outputpath).mkdirs();
		}
		
		Job job = new Job(new JobConf());
		job.setJarByClass(MyJob.class); 
		job.setJobName("MyJob"); 
		job.setOutputKeyClass(IntWritable.class);// 输出的 key 类型,在 OutputFormat 会检查
	    job.setOutputValueClass(IntWritable.class); // 输出的 value 类型,在 OutputFormat 会检查
	    job.setMapperClass(myMapper.class); 
	    job.setCombinerClass(myReducer.class); 
	    job.setReducerClass(myReducer.class); 
	    FileInputFormat.setInputPaths(job, new Path(shortin));//hdfs 中的输入路径
	    FileOutputFormat.setOutputPath(job,new Path(shortout));//hdfs 中输出路径

	    Date startTime = new Date(); 
	    logger.info("Job started: " + startTime); 
	    job.waitForCompletion(true);    
	    Date end_time = new Date(); 
	    logger.info("Job ended: " + end_time); 
	    logger.info("The job took " + 
	    (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); 
	    return 0; 

	}
}

 编写完成后,一个简单的mapreduce就编写完成了,然后通过打包工具将编写的类打成jar包,关联的jar就不需要了,因为hadoop里面的 jar命令会自己去关联相应的jar文件。,打包时 main 方法指定为job即可,将包存放在hadoop根目录,然后将需要分析的文件存放在hdfs系统

清空输出路径 ./bin/hadoop dfs -rmr /output

建立输入路径 ./bin/hadoop dfs -mkdir /input

上传文件 ./bin/hadoop dfs -copyFromLocal 本地路径 hdfs路径

运行jar文件 ./bin/hadoop jar myJob.jar /input /output

运行完成后 进入输出路径 查看输出结果即可。

相关推荐