Hadoop【MR的分区、排序、分组】
目录
一.分区
问题:按照条件将结果输出到不同文件中
自定义分区步骤
1.自定义继承Partitioner类,重写getPartition()方法
2.在job驱动Driver中设置自定义的Partitioner
3.在Driver中根据分区数设置reducetask数
分区数和reducetask关系
案例实操
将统计结果按照手机归属地不同省份输出到不同文件中(分区),手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中
(1)自定义分区类
MyPartitioner.class
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class MyPartioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text text, FlowBean flowBean, int numPartitions) { String phone = text.toString(); if (phone.startsWith("136")) { return 0; } else if (phone.startsWith("137")) { return 1; } else if (phone.startsWith("138")) { return 2; }else if (phone.startsWith("139")){ return 3; }else { return 4; } } }
(2)在Driver类设置分区和reducetask数
//设置自定义partitioner job.setPartitionerClass(MyPartioner.class); //设置reducetask数量 job.setNumReduceTasks(5);
二.全排序、分区排序、分组
当自定义的对象作为key,按照指定条件进行排序
实现排序的2种方式
1.对象实现WritableComparable接口
实现WritableComparable接口,重写compareTo方法,就可以实现排序(二次排序)
public class OrderBean implements WritableComparable<OrderBean> { //自定义排序,先按pid升序,再按pname降序 @Override public int compareTo(OrderBean o) { int compare = this.pid.compareTo(o.pid); if (compare == 0) { return -this.pname.compareTo(o.pname); } return compare; } }
2.继承WritableComparator类
自定义比较器继承WritableComparator类,父类构造方法增加需要比较的Bean对象,
//继承WritableComparator类 public class MyGroupCompartor extends WritableComparator { public MyGroupCompartor(){ //增加Bean对象 super(OrderBean.class,true); } // 对Bean的排序方法 @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean oa = (OrderBean) a; OrderBean ob = (OrderBean) b; return oa.getPid().compareTo(ob.getPid()); } }
全排序
不分区,只有一个reducetask,针对Key进行排序
分区排序
针对key全排序,然后针对key进行分区
辅助排序【自定义分组】
分析:已经对key进行排序,比如key对象为OrderBean的排序是id,pname的二次排序
,在进入reduce()的分组希望是id相同的进入一组,那么就需要自定义分组针对id进行分组
OrderBean id pname amount 1 小米 1 2400 1 1500 2 华为 2 2400 2 3400
自定义分组比较器
MyGroupCompartor.class
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class MyGroupCompartor extends WritableComparator { public MyGroupCompartor(){ super(OrderBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean oa = (OrderBean) a; OrderBean ob = (OrderBean) b; return oa.getPid().compareTo(ob.getPid()); } }
在Driver类中声明自定义分组
job.setGroupingComparatorClass(MyGroupCompartor.class);
相关推荐
changjiang 2020-11-16
minerd 2020-10-28
WeiHHH 2020-09-23
Aleks 2020-08-19
WeiHHH 2020-08-17
飞鸿踏雪0 2020-07-26
tomli 2020-07-26
deyu 2020-07-21
strongyoung 2020-07-19
eternityzzy 2020-07-19
Elmo 2020-07-19
飞鸿踏雪0 2020-07-09
飞鸿踏雪0 2020-07-04
xieting 2020-07-04
WeiHHH 2020-06-28
genshengxiao 2020-06-26
Hhanwen 2020-06-25