Hadoop基础---MapReduce对数据进行排序
承接上文:Hadoop基础---流量求和MapReduce程序及自定义数据类型
一:实验数据
对上一篇文章中的数据进行排序处理:
200 380 7335 7437 400 6292 200 187052 1938 1950 6960 6969 200 200 24681 27162 200 320 200 380 200 200 3720 6728 11058 11121 200 200 200 1712 3659 3686 3156 3176 1938 1941 1527 1539 9531 9549 1432 5548
二:MapReduce程序编写
(一)自定义数据结构FlowBean编写
package cn.hadoop.mr.wc; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean> { private String phoneNB; private long up_flow; private long down_flow; private long sum_flow; public FlowBean() {} //无参构造函数,用于反序列化时使用 public FlowBean(String phoneNB, long up_flow, long down_flow) { this.phoneNB = phoneNB; this.up_flow = up_flow; this.down_flow = down_flow; this.sum_flow = up_flow + down_flow; } public String getPhoneNB() { return phoneNB; } public void setPhoneNB(String phoneNB) { this.phoneNB = phoneNB; } public long getUp_flow() { return up_flow; } public void setUp_flow(long up_flow) { this.up_flow = up_flow; } public long getDown_flow() { return down_flow; } public void setDown_flow(long down_flow) { this.down_flow = down_flow; } public long getSum_flow() { return up_flow + down_flow; } //用于序列化 @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(phoneNB); out.writeLong(up_flow); out.writeLong(down_flow); out.writeLong(up_flow+down_flow); } //用于反序列化 @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub phoneNB = in.readUTF(); up_flow = in.readLong(); down_flow = in.readLong(); sum_flow = in.readLong(); } @Override public int compareTo(FlowBean o) { //用于排序操作 return sum_flow > o.sum_flow ? -1 : 1; //返回值为-1,则排在前面 } @Override public String toString() { return "" + up_flow + "\t" + down_flow + "\t"+ sum_flow; } }
(二)Map程序编写
package cn.hadoop.rs; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import cn.hadoop.mr.wc.FlowBean; public class ResSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, NullWritable>.Context context) throws IOException, InterruptedException { //获取一行数据 String line = value.toString(); //进行文本分割 String[] fields = StringUtils.split(line, ‘\t‘); //数据获取 String phoneNB = fields[0]; long up_flow = Long.parseLong(fields[1]); long down_flow = Long.parseLong(fields[2]); context.write(new FlowBean(phoneNB, up_flow, down_flow), NullWritable.get()); } }
(三)Reduce程序编写
package cn.hadoop.rs; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import cn.hadoop.mr.wc.FlowBean; //会在reduce接收数据时,对key进行排序 public class ResSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{ @Override protected void reduce(FlowBean key, Iterable<NullWritable> values, Reducer<FlowBean, NullWritable, Text, FlowBean>.Context context) throws IOException, InterruptedException { String phoneNB = key.getPhoneNB(); context.write(new Text(phoneNB), key); } }
注意:排序比较会在Reduce接收到key时进行排序,所以我们需要对输入的key进行处理
(四)主函数进行调用
package cn.hadoop.rs; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import cn.hadoop.mr.wc.FlowBean; public class ResSortRunner { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ResSortRunner.class); job.setMapperClass(ResSortMapper.class); job.setReducerClass(ResSortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
(五)结果测试
hadoop jar rs.jar cn.hadoop.rs.ResSortRunner /fs/output1 /fs/output6
三:实现将两个job在main中一次执行
(一)修改main方法,实现连续调用两个job
package cn.hadoop.rs; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import cn.hadoop.fs.FlowSumMapper; import cn.hadoop.fs.FlowSumReducer; import cn.hadoop.fs.FlowSumRunner; import cn.hadoop.mr.wc.FlowBean; public class ResSortRunner { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf1 = new Configuration(); Job job1 = Job.getInstance(conf1); job1.setJarByClass(FlowSumRunner.class); job1.setMapperClass(FlowSumMapper.class); job1.setReducerClass(FlowSumReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(FlowBean.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); if(!job1.waitForCompletion(true)) { System.exit(1); } Configuration conf2 = new Configuration(); Job job2 = Job.getInstance(conf2); job2.setJarByClass(ResSortRunner.class); job2.setMapperClass(ResSortMapper.class); job2.setReducerClass(ResSortReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(FlowBean.class); job2.setMapOutputKeyClass(FlowBean.class); job2.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2])); System.exit(job2.waitForCompletion(true)?0:1); } }
(二)实验测试,结果查看
hadoop jar rs.jar cn.hadoop.rs.ResSortRunner /fs/input /fs/outdata1 /fs/outdata2
(三)补充:使用时,不推荐这种方法。中间结果单独输出,使用shell将各个程序串联,灵活性更大,更容易调试
相关推荐
tomli 2020-07-26
ErixHao 2020-05-16
zhangll00 2020-05-07
香帅 2020-04-15
strongyoung 2020-04-11
sujins 2020-03-06
lixiaotao 2020-03-05
tomli 2020-03-05
zzjmay 2020-02-23
通过实现MapReduce计算结果保存到MySql数据库过程,掌握多种方式保存计算结果的技术,加深了对MapReduce的理解;创建maven项目,项目名称hdfs,这里不再说明。红色部分为增加内容:
大白配小猪 2020-02-09
IT智囊 2020-01-01
yogoma 2020-01-10
sujins 2020-01-09
飞鸿踏雪0 2019-12-29