hadoop join实现
hadoop的join实现,实现符合关键字,多对多连接
key:
public class MultiKey implements WritableComparable<MultiKey> { private Text departId = new Text(); private Text departNo = new Text(); public Text getDepartId() { return departId; } public void setDepartId(String departId) { this.departId = new Text(departId); } public Text getDepartNo() { return departNo; } public void setDepartNo(String departNo) { this.departNo = new Text(departNo); } @Override public void write(DataOutput out) throws IOException { departId.write(out); departNo.write(out); } @Override public void readFields(DataInput in) throws IOException { this.departId.readFields(in); this.departNo.readFields(in); } @Override public int compareTo(MultiKey o) { return (this.departId.compareTo(o.departId) !=0)? this.departId.compareTo(o.departId) : this.departNo.compareTo(o.departNo); } @Override public String toString(){ return this.departId.toString()+" : "+this.departNo.toString(); } @Override public int hashCode(){ return 0; } }
value:
public class Employee implements WritableComparable<Employee> { private String empName=""; private String departId=""; private String departNo=""; private String departName=""; private int flag; public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } public String getEmpName() { return empName; } public void setEmpName(String empName) { this.empName = empName; } public String getDepartId() { return departId; } public void setDepartId(String departId) { this.departId = departId; } public String getDepartNo() { return departNo; } public void setDepartNo(String departNo) { this.departNo = departNo; } public String getDepartName() { return departName; } public void setDepartName(String departName) { this.departName = departName; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.empName); out.writeUTF(this.departId); out.writeUTF(this.departNo); out.writeUTF(this.departName); out.writeInt(this.flag); } @Override public void readFields(DataInput in) throws IOException { this.empName = in.readUTF(); this.departId = in.readUTF(); this.departNo = in.readUTF(); this.departName = in.readUTF(); this.flag = in.readInt(); } public static void writeAllProperties(DataOutput out,Class<? extends WritableComparable<?>> type,Object obj) throws IllegalArgumentException, IllegalAccessException{ Field[] fields = type.getDeclaredFields(); for (Field field : fields) { System.out.println(field.get(obj)); } } @Override public int compareTo(Employee o) { return 0; } @Override public String toString(){ return this.empName+" "+this.departName; } }
maper:
public class MyJoinMapper extends Mapper<LongWritable, Text, MultiKey, Employee>{ @Override public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String line = value.toString(); String[] array = line.split(","); visit(array,context); } private void visit(String[] array,Context context) throws IOException,InterruptedException{ int i = Integer.valueOf(array[0]); MultiKey key = new MultiKey(); Employee e = new Employee(); switch (i) { case 1://name e.setEmpName(array[1]); e.setFlag(1); break; default://depart e.setDepartName(array[1]); e.setFlag(2); break; } e.setDepartId(array[2]); e.setDepartNo(array[3]); key.setDepartId(e.getDepartId()); key.setDepartNo(e.getDepartNo()); context.write(key, e); } }
reducer:
public class MyJoinReducer extends Reducer<MultiKey, Employee, IntWritable, Text>{ List<emp> empList = new LinkedList<emp>(); List<depart> departList = new LinkedList<MyJoinReducer.depart>(); @Override public void reduce(MultiKey key,Iterable<Employee> values,Context context) throws IOException,InterruptedException{ for (Employee employee : values) { visite(employee); } System.out.println("----------"); System.out.println(key); for (emp em : empList) { for (depart de : departList) { Employee e = new Employee(); e.setDepartId(em.departId); e.setDepartName(de.departName); e.setDepartNo(em.departNo); e.setEmpName(em.empName); context.write(new IntWritable(1), new Text(e.toString())); } } empList = new LinkedList<emp>(); departList = new LinkedList<MyJoinReducer.depart>(); } private void visite(Employee e){ switch (e.getFlag()) { case 1: emp em = new emp(); em.departId = e.getDepartId(); em.departNo = e.getDepartName(); em.empName = e.getEmpName(); empList.add(em); break; default: depart de = new depart(); de.departName = e.getDepartName(); departList.add(de); break; } } private class emp{ public String empName; public String departId; public String departNo; } private class depart{ public String departName; } }
comparator
public class MyJoinComparator extends WritableComparator{ protected MyJoinComparator() { super(MultiKey.class,true); } }
groupcomparator:
public class MyJoinGroupComparator implements RawComparator<MultiKey> { private DataInputBuffer buffer = new DataInputBuffer(); @Override public int compare(MultiKey key1, MultiKey key2) { return key1.compareTo(key2); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return new MyJoinComparator().compare(b1, s1, l1, b2, s2, l2); } }
补个测试类
public class MyJoinTest { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { upload("dirk1.txt", "dirk.txt"); upload("dirk2.txt","dirk2.txt"); delete(); Configuration conf = new Configuration(); Job job = new Job(conf, "joinJob"); job.setMapperClass(MyJoinMapper.class); job.setReducerClass(MyJoinReducer.class); job.setMapOutputKeyClass(MultiKey.class); job.setMapOutputValueClass(Employee.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setGroupingComparatorClass(MyJoinGroupComparator.class); FileInputFormat.addInputPath(job, new Path("/user/dirk3/input")); FileOutputFormat.setOutputPath(job, new Path("/user/dirk3/output")); job.waitForCompletion(true); } public static void upload(String local,String remote) throws IOException{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); String l = MyJoinTest.class.getResource("").getPath()+"/"+local; fs.copyFromLocalFile(false, true, new Path(l), new Path("/user/dirk3/input/"+remote)); } public static void delete() throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); fs.delete(new Path("/user/dirk3/output"), true); } public static void run(){ JobConf jobConf = new JobConf(); jobConf.setOutputKeyComparatorClass(MyJoinComparator.class); jobConf.setOutputValueGroupingComparator(MyJoinComparator.class); } }
join的主要实现在reducer中
关于comparator,在通过maper向context中添加key value后,通过combine,partition之后,进入reducer阶段,进行groupComparator,决定哪些key同时进入一个reducer
相关推荐
changjiang 2020-11-16
minerd 2020-10-28
WeiHHH 2020-09-23
Aleks 2020-08-19
WeiHHH 2020-08-17
飞鸿踏雪0 2020-07-26
tomli 2020-07-26
deyu 2020-07-21
strongyoung 2020-07-19
eternityzzy 2020-07-19
Elmo 2020-07-19
飞鸿踏雪0 2020-07-09
飞鸿踏雪0 2020-07-04
xieting 2020-07-04
WeiHHH 2020-06-28
genshengxiao 2020-06-26
Hhanwen 2020-06-25