Hadoop 系列(四)自定义数据类

一:自定义数据类

为什么需要自定义数据类

        上一篇文章里面我们自己写了一个mapreduce 里面的Top N的https://www.cnblogs.com/wuxiaolong4/p/12733518.html,我们可以从代码里面看出来,里面用|作分隔符,这种方法不好,会由于你自己的失误导致读取字段出错或者把自己搞晕了。这时候我们就需要自定义类,定义属于自己的实体类,有点像操作数据库里面的entity。

怎么定义自己的数据类

class MinMaxtemperature implements WritableComparable<MinMaxtemperature> {    public int compareTo(MinMaxtemperature otherMinMaxtemperature) {        if(date.equals(otherMinMaxtemperature.getDate())) return 1;        else return 0;    }    public void write(DataOutput out) throws IOException {        out.writeInt(max);        out.writeInt(min);    }    public void readFields(DataInput in) throws IOException {        max = in.readInt();        min = in.readInt();    }    private  String date = "";    private int min =0 ;    private int max =0 ;    public String getDate() {        return date;    }    public void setDate(String date) {        this.date = date;    }    public int getMin() {        return min;    }    public void setMin(int min) {        this.min = min;    }    public int getMax() {        return max;    }    public void setMax(int max) {        this.max = max;    }    @Override    public String toString() {        return "MinMaxtemperature{" +                "date=‘" + date + ‘\‘‘ +                ", min=" + min +                ", max=" + max +                ‘}‘;    }}

怎么使用自己的类

数据:

1
3
4
5
6
7
8
9
1
3
4
5
6
7
8
9
1
3
4
5
6
7
8
9

代码1 :输出最高温度和最低温度:

package org.example;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.util.HashMap;import java.util.Map;class MinMaxtemperature implements WritableComparable<MinMaxtemperature> {    private  String date = "";    private int min =0 ;    private int max =0 ;    public MinMaxtemperature() {    }    public MinMaxtemperature(String date, int max, int min) {        this.date = date;        this.min = min;        this.max = max;    }    public int compareTo(MinMaxtemperature otherMinMaxtemperature) {        if(date.equals(otherMinMaxtemperature.getDate())) return 1;        else return 0;    }    public void write(DataOutput out) throws IOException {        out.writeInt(max);        out.writeInt(min);    }    public void readFields(DataInput in) throws IOException {        max = in.readInt();        min = in.readInt();    }    public String getDate() {        return date;    }    public void setDate(String date) {        this.date = date;    }    public int getMin() {        return min;    }    public void setMin(int min) {        this.min = min;    }    public int getMax() {        return max;    }    public void setMax(int max) {        this.max = max;    }    @Override    public String toString() {        return "MinMaxtemperature{" +                "date=‘" + date + ‘\‘‘ +                ", min=" + min +                ", max=" + max +                ‘}‘;    }}class WordcountMapper extends Mapper<LongWritable, Text, Text, MinMaxtemperature> {    private MinMaxtemperature dataMap = new MinMaxtemperature();    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        String line[] = value.toString().split("\\ ");        String date = line[0].substring(0,line[0].length()-2);        int temperature = Integer.parseInt(line[1]);        if(temperature > dataMap.getMax()){            dataMap.setMax(temperature);        }        if(temperature < dataMap.getMin()){            dataMap.setMin(temperature);        }    }    @Override    protected void cleanup(Context context) throws IOException, InterruptedException {        context.write(new Text(dataMap.getDate()),dataMap);    }}class WordcountReducer extends Reducer<Text,MinMaxtemperature,Text,Text> {    private MinMaxtemperature dataMap = new MinMaxtemperature();    @Override    protected void reduce(Text key, Iterable<MinMaxtemperature> values, Context context) throws IOException, InterruptedException {        String date = key.toString();        System.out.println("reduce:"+date);        for (MinMaxtemperature value : values ) {            if(value.getMax() > dataMap.getMax()){                dataMap.setMax(value.getMax());            }            if(value.getMin() < dataMap.getMin()){                dataMap.setMin(value.getMin());            }        }    }    @Override    protected void cleanup(Context context) throws IOException, InterruptedException {        context.write(new Text("max:min"),new Text(dataMap.getMax()+":"+dataMap.getMin()));    }}public class WordcountDriver {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "file:///");        FileSystem fs= FileSystem.get(conf);        String outputPath = "/software/java/data/output/";        if(fs.exists(new Path(outputPath))) fs.delete(new Path(outputPath),true);        Job job = Job.getInstance(conf);        job.setJarByClass(WordcountDriver.class);        job.setMapperClass(WordcountMapper.class);        job.setReducerClass(WordcountReducer.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(MinMaxtemperature.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);        FileInputFormat.setInputPaths(job, new Path("/software/java/data/input/"));        FileOutputFormat.setOutputPath(job, new Path(outputPath));        //将job配置的参数,以及job所用的java类所在的jar包提交给yarn去运行        //job.submit();        boolean res = job.waitForCompletion(true);    }}
代码2 分组内输出最高温度和最低温度:
 

       代码3和代码4就不写了,因为差不多。

相关推荐