Hadoop编写调试MapReduce程序详解

编程学习,最好的方法还是自己动手,所以这里简单介绍在Hadoop上编写调试一个MapReduce程序。

先说一下我的开发环境,我的操作系统是CentOS 6.0,Hadoop版本是0.20.2,开发环境是eclipse。在Hadoop的0.20.0版本以后,都包含一个新的Java MapReduce API,这个API有时也称为上下文对象,新的API在类型上不兼容以前的API。关于新旧API的区别,这里先不做介绍了,只是在编程的时候一定要注意下,网上好多MapReduce的程序是基于旧的API编写的,如果自己安装的是新版的Hadoop,就会在调试时出现错误,初学者不明白这一点或许会走弯路。

1问题描述:查找最高气温。就是从气候日志数据中查找当年的最高气温,假设每一条记录的格式如下:“China2013inbeijing023isok0",其中2013是年份,023是温度记录,ok表示数据是完好的(为了简单易懂,省略其他的信息),我们的任务是从大量的数据记录中找出北京2013年的最高气温。这样的数据很适合用MapReduce来处理。

2问题分析,这个问题很简单,这里用这么简单的数据只是为了说明在Hadoop上编写调试MapReduce程序的过程。对于一条数据这需要提取出来年份和温度,然后找出最大温度就行了。这了类似于分治法,每一个Map过程就是分得过程,Reduce就是合的过程。

3 编码:

3.1Map函数:

//载入一些必要的包
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper;
//新版APIMap过程需要继承org.apache.hadoop.mapreduce包Mapper<SPAN style="COLOR: red"></SPAN>类,并重写其map方法
public class MaxtemMapper extends
Mapper<LongWritable,Text,Text,IntWritable>{
 private static final int MISSING=9999;
 public void map(LongWritable key,Text value,Context context)
 throws IOException,InterruptedException{
  String line=value.toString();//把Text类的对象转化为String类来处理
  String year=line.substring(5,9);//该String的第5-9就是年份
  int airtemperature;
              //有的数据中温度前面带”+“,需要处理下,带”+“的话第19到21为温度,不带的的话第18到21
              //为温度
            &nbsp;if(line.charAt(18)=='+'){
   airtemperature=Integer.parseInt(line.substring(19, 21));
  }
  else {
   airtemperature=Integer.parseInt(line.substring(18,21));
  }
  System.out.println("year:"+year+"tem:"+airtemperature);
              判断数据是否是正确的
            &nbsp;String quality=line.substring(23, 25);
  if(airtemperature!=MISSING && quality.matches("ok")){
      context.write(new Text(year), new IntWritable(airtemperature));
  }
 }
}

3.2Reduce函数:

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxtemReducer extends
Reducer<Text,IntWritable,Text,IntWritable>{
 public void reduce(Text key,Iterator<IntWritable> values,
   Context context)
 throws IOException,InterruptedException{
  int maxValue=Integer.MIN_VALUE;
  while(values.hasNext()){
   maxValue=Math.max(maxValue,values.next().get());
  }
  context.write( key, new IntWritable(maxValue));
 }
 
}

3.3执行MapReduce过程

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MaxTemperature{
 
 //the main function
 public static void main(String [] args)throws Exception{
  Configuration conf=new Configuration();
 
  if(args.length!=2){
   System.out.println("Usage: Maxtemperature 
                <input path> <output path>");
   System.exit(-1);
  }
  Job job=new Job(conf,"MaxTemperature");
  job.setJarByClass(MaxTemperature.class);
 
  FileInputFormat.addInputPath(job, new
                Path(args[0]));
  FileOutputFormat.setOutputPath(job, new
                Path(args[1]));
 
  job.setMapperClass(MaxtemMapper.class);
  job.setReducerClass(MaxtemReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  job.waitForCompletion(true);
  //output the details of the work
  System.out.println("name:"+job.getJobName());
  System.out.println("isSuccessful?:"+ 
                (job.isSuccessful()?"yes":"no"));
   
 }
}

相关推荐