hadoop0.18.3 到 0.20.2
以前用的是0.18.3,现在改用0.20.2,結果发现mapreduce的接口变了好多,而《mapreduce 权威指南》这本书上还是0.18.3的接口 ,这里记录一下今天下午的探索:
最大的变化是作業配置那部分,新的版本里面不再使用JobConf, 而是使用了Job,这里的Job继承自JobContext,它集成了JobConf 。
Job里面还是用了相同的设置inputPath, outputPath, inputFormat, outputFormat之类的,主要的不同我认为有以下几个:
1. 初始化不一样,
前者: JobConf conf = new JobConf(getConf(), WordCount.class );
后才: Job job = new Job(conf, "word count" );
2. 执行不同:
前者: JobClient.runJob(conf)
后才:job.waitForCompletion(true )
3. 最隐含是变化:
前者:setMapperClass(class<? extends MapReduceBase implements Mapper>) 和 setReducerClass(class<? extends MapReducerBase implements Reducer>)
后者:setMapperClass(class<? extends Mapper>) 和 setReducerClass(class<? extends Reducer>)
也就是说Map类和Reduce也有所变化,并且在import的时候要注意,
前者的mapper类和reduce类不仅要extends xxxbase父类,而且要implements mapper和reduce 接口,且
import org.apache.hadoop.mapred.MapReduceBase,
import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Reducer;
后才的mapper类和reduce类只要extends Mapper Reducer父类。
具体的比较程序如下:
前者出自《mapreduce 权威指南》,是旧版本的一个程序:
Mapper类:
import java.io.IOException;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapred.MapReduceBase;
importorg.apache.hadoop.mapred.Mapper;
importorg.apache.hadoop.mapred.OutputCollector;
importorg.apache.hadoop.mapred.Reporter;
publicclassMaxTemperatureMapperextendsMapReduceBase
implementsMapper<LongWritable,Text,Text,IntWritable>{
privatestaticfinalintMISSING=9999;
publicvoidmap(LongWritablekey,Textvalue,
OutputCollector<Text,IntWritable>output,Reporterreporter)
throwsIOException{
}
}
Stringline=value.toString();
Stringyear=line.substring(15,19);
intairTemperature;
if(line.charAt(87)=='+'){//parseIntdoesn'tlikeleadingplussigns
airTemperature=Integer.parseInt(line.substring(88,92));
}else{
airTemperature=Integer.parseInt(line.substring(87,92));
}
Stringquality=line.substring(92,93);
if(airTemperature!=MISSING&&quality.matches("[01459]")){
output.collect(newText(year),newIntWritable(airTemperature));
}Reducer类:
import java.io.IOException;
importjava.util.Iterator;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapred.MapReduceBase;
importorg.apache.hadoop.mapred.OutputCollector;
importorg.apache.hadoop.mapred.Reducer;
importorg.apache.hadoop.mapred.Reporter;
publicclassMaxTemperatureReducerextendsMapReduceBase
implementsReducer<Text,IntWritable,Text,IntWritable>{
publicvoidreduce(Textkey,Iterator<IntWritable>values,
OutputCollector<Text,IntWritable>output,Reporterreporter)
throwsIOException{
}
}
intmaxValue=Integer.MIN_VALUE;
while(values.hasNext()){
maxValue=Math.max(maxValue,values.next().get());
}
output.collect(key,newIntWritable(maxValue));
}}
主类
import java.io.IOException;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapred.FileInputFormat;
importorg.apache.hadoop.mapred.FileOutputFormat;
importorg.apache.hadoop.mapred.JobClient;
importorg.apache.hadoop.mapred.JobConf;
publicclassMaxTemperature{
publicstaticvoidmain(String[]args)throwsIOException{
if(args.length!=2){
System.err.println("Usage:MaxTemperature<inputpath><outputpath>");
System.exit(-1);
}
JobConfconf=newJobConf(MaxTemperature.class);
conf.setJobName("Maxtemperature");
FileInputFormat.addInputPath(conf,newPath(args[0]));
FileOutputFormat.setOutputPath(conf,newPath(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
}
}
JobClient.runJob(conf);}
}
我修改后的新版本程序:
import java.io.IOException;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapred.OutputCollector;
importorg.apache.hadoop.mapred.Reporter;
publicclassMaxTemperatureMapperextendsMapper<LongWritable,Text,Text,IntWritable>{
privatestaticfinalintMISSING=9999;
publicvoidmap(LongWritablekey,Textvalue,
OutputCollector<Text,IntWritable>output,Reporterreporter)
throwsIOException{
//TODOAuto-generatedmethodstub
Stringline=value.toString();
Stringyear=line.substring(15,19);
intairTemperature;
if(line.charAt(87)=='+'){
airTemperature=Integer.parseInt(line.substring(88,92));
}else{
airTemperature=Integer.parseInt(line.substring(87,92));
}
Stringquality=line.substring(92,93);
if(airTemperature!=MISSING&&quality.matches("[01459]")){
output.collect(newText(year),newIntWritable(airTemperature));
}
}
}
import java.io.IOException;
importjava.util.Iterator;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapred.OutputCollector;
importorg.apache.hadoop.mapred.Reporter;
publicclassMaxTemperatureReducerextendsReducer<Text,IntWritable,Text,IntWritable>{
publicvoidreduce(Textkey,Iterator<IntWritable>values,
OutputCollector<Text,IntWritable>output,Reporterreporter)throwsIOException{
intmaxValue=Integer.MIN_VALUE;
while(values.hasNext()){
maxValue=Math.max(maxValue,values.next().get());
}
output.collect(key,newIntWritable(maxValue));
}
}import java.io.IOException;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
publicclassMaxTemperature{
publicstaticvoidmain(String[]argc)throwsIOException,InterruptedException,ClassNotFoundException{
if(argc.length!=2){
System.out.println("Usage:MaxTemperature<input><output>");
System.exit(-1);
}
Configurationconf=newConfiguration();
Jobj=newJob(conf,"MaxTemperature");
j.setJarByClass(MaxTemperature.class);
j.setMapperClass(MaxTemperatureMapper.class);
j.setReducerClass(MaxTemperatureReducer.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(j,newPath(argc[0]));
FileOutputFormat.setOutputPath(j,newPath(argc[1]));
System.exit(j.waitForCompletion(true)?0:1);
}
}
参考: http://blog.csdn.net/amuseme_lu/archive/2010/05/13/5588545.aspx
转自:http://blog.csdn.net/JiaoYanChen/archive/2010/08/16/5816573.aspx