Hadoop 系列(四)自定义数据类
一:自定义数据类
为什么需要自定义数据类
上一篇文章里面我们自己写了一个mapreduce 里面的Top N的https://www.cnblogs.com/wuxiaolong4/p/12733518.html,我们可以从代码里面看出来,里面用|作分隔符,这种方法不好,会由于你自己的失误导致读取字段出错或者把自己搞晕了。这时候我们就需要自定义类,定义属于自己的实体类,有点像操作数据库里面的entity。
怎么定义自己的数据类
class MinMaxtemperature implements WritableComparable<MinMaxtemperature> { public int compareTo(MinMaxtemperature otherMinMaxtemperature) { if(date.equals(otherMinMaxtemperature.getDate())) return 1; else return 0; } public void write(DataOutput out) throws IOException { out.writeInt(max); out.writeInt(min); } public void readFields(DataInput in) throws IOException { max = in.readInt(); min = in.readInt(); } private String date = ""; private int min =0 ; private int max =0 ; public String getDate() { return date; } public void setDate(String date) { this.date = date; } public int getMin() { return min; } public void setMin(int min) { this.min = min; } public int getMax() { return max; } public void setMax(int max) { this.max = max; } @Override public String toString() { return "MinMaxtemperature{" + "date=‘" + date + ‘\‘‘ + ", min=" + min + ", max=" + max + ‘}‘; }}
怎么使用自己的类
数据:
1 3 4 5 6 7 8 9 1 3 4 5 6 7 8 9 1 3 4 5 6 7 8 9
代码1 :输出最高温度和最低温度:
package org.example;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.util.HashMap;import java.util.Map;class MinMaxtemperature implements WritableComparable<MinMaxtemperature> { private String date = ""; private int min =0 ; private int max =0 ; public MinMaxtemperature() { } public MinMaxtemperature(String date, int max, int min) { this.date = date; this.min = min; this.max = max; } public int compareTo(MinMaxtemperature otherMinMaxtemperature) { if(date.equals(otherMinMaxtemperature.getDate())) return 1; else return 0; } public void write(DataOutput out) throws IOException { out.writeInt(max); out.writeInt(min); } public void readFields(DataInput in) throws IOException { max = in.readInt(); min = in.readInt(); } public String getDate() { return date; } public void setDate(String date) { this.date = date; } public int getMin() { return min; } public void setMin(int min) { this.min = min; } public int getMax() { return max; } public void setMax(int max) { this.max = max; } @Override public String toString() { return "MinMaxtemperature{" + "date=‘" + date + ‘\‘‘ + ", min=" + min + ", max=" + max + ‘}‘; }}class WordcountMapper extends Mapper<LongWritable, Text, Text, MinMaxtemperature> { private MinMaxtemperature dataMap = new MinMaxtemperature(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line[] = value.toString().split("\\ "); String date = line[0].substring(0,line[0].length()-2); int temperature = Integer.parseInt(line[1]); if(temperature > dataMap.getMax()){ dataMap.setMax(temperature); } if(temperature < dataMap.getMin()){ dataMap.setMin(temperature); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new Text(dataMap.getDate()),dataMap); }}class WordcountReducer extends Reducer<Text,MinMaxtemperature,Text,Text> { private MinMaxtemperature dataMap = new MinMaxtemperature(); @Override protected void reduce(Text key, Iterable<MinMaxtemperature> values, Context context) throws IOException, InterruptedException { String date = key.toString(); System.out.println("reduce:"+date); for (MinMaxtemperature value : values ) { if(value.getMax() > dataMap.getMax()){ dataMap.setMax(value.getMax()); } if(value.getMin() < dataMap.getMin()){ dataMap.setMin(value.getMin()); } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new Text("max:min"),new Text(dataMap.getMax()+":"+dataMap.getMin())); }}public class WordcountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); FileSystem fs= FileSystem.get(conf); String outputPath = "/software/java/data/output/"; if(fs.exists(new Path(outputPath))) fs.delete(new Path(outputPath),true); Job job = Job.getInstance(conf); job.setJarByClass(WordcountDriver.class); job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MinMaxtemperature.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("/software/java/data/input/")); FileOutputFormat.setOutputPath(job, new Path(outputPath)); //将job配置的参数,以及job所用的java类所在的jar包提交给yarn去运行 //job.submit(); boolean res = job.waitForCompletion(true); }}
代码2 分组内输出最高温度和最低温度:
代码3和代码4就不写了,因为差不多。
相关推荐
minerd 2020-10-28
Kafka 2020-09-18
Wepe0 2020-10-30
杜倩 2020-10-29
windle 2020-10-29
mengzuchao 2020-10-22
Junzizhiai 2020-10-10
bxqybxqy 2020-09-30
风之沙城 2020-09-24
kingszelda 2020-09-22
大唐帝国前营 2020-08-18
yixu0 2020-08-17
TangCuYu 2020-08-15
xiaoboliu00 2020-08-15
songshijiazuaa 2020-08-15
xclxcl 2020-08-03
zmzmmf 2020-08-03
newfarhui 2020-08-03
likesyour 2020-08-01