hadoop join实现

hadoop的join实现,实现符合关键字,多对多连接

key:

public class MultiKey implements WritableComparable<MultiKey> {

	private Text departId = new Text();
	private Text departNo = new Text();

	public Text getDepartId() {
		return departId;
	}

	public void setDepartId(String departId) {
		this.departId = new Text(departId);
	}

	public Text getDepartNo() {
		return departNo;
	}

	public void setDepartNo(String departNo) {
		this.departNo = new Text(departNo);
	}

	@Override
	public void write(DataOutput out) throws IOException {
		departId.write(out);
		departNo.write(out);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.departId.readFields(in);
		this.departNo.readFields(in);
	}

	@Override
	public int compareTo(MultiKey o) {
		return (this.departId.compareTo(o.departId) !=0)? this.departId.compareTo(o.departId) : this.departNo.compareTo(o.departNo);
		
	}
	
	@Override
	public String toString(){
		return this.departId.toString()+" : "+this.departNo.toString();
	}
	
	@Override
	public int hashCode(){
		return 0;
	}
}
 

value:

public class Employee implements WritableComparable<Employee> {

	private String empName="";
	private String departId="";
	private String departNo="";
	private String departName="";
	private int flag;
	
	public int getFlag() {
		return flag;
	}

	public void setFlag(int flag) {
		this.flag = flag;
	}

	public String getEmpName() {
		return empName;
	}

	public void setEmpName(String empName) {
		this.empName = empName;
	}

	public String getDepartId() {
		return departId;
	}

	public void setDepartId(String departId) {
		this.departId = departId;
	}

	public String getDepartNo() {
		return departNo;
	}

	public void setDepartNo(String departNo) {
		this.departNo = departNo;
	}

	public String getDepartName() {
		return departName;
	}

	public void setDepartName(String departName) {
		this.departName = departName;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.empName);
		out.writeUTF(this.departId);
		out.writeUTF(this.departNo);
		out.writeUTF(this.departName);
		out.writeInt(this.flag);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.empName = in.readUTF();
		this.departId = in.readUTF();
		this.departNo = in.readUTF();
		this.departName = in.readUTF();
		this.flag = in.readInt();
	}
	
	public static void writeAllProperties(DataOutput out,Class<? extends WritableComparable<?>> type,Object obj) throws IllegalArgumentException, IllegalAccessException{
		Field[] fields =  type.getDeclaredFields();
		for (Field field : fields) {
			System.out.println(field.get(obj));
		}
	}

	@Override
	public int compareTo(Employee o) {
		return 0;
	}
	
	
	@Override
	public String toString(){
		return this.empName+"  "+this.departName;
	}

}
 

maper:

public class MyJoinMapper extends Mapper<LongWritable, Text, MultiKey, Employee>{
	@Override
	public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
		String line = value.toString();
		String[] array = line.split(",");
		visit(array,context);
	}
	
	private void visit(String[] array,Context context) throws IOException,InterruptedException{
		int i = Integer.valueOf(array[0]);
		MultiKey key = new MultiKey();
		Employee e = new Employee();
		switch (i) {
		case 1://name
			e.setEmpName(array[1]);
			e.setFlag(1);
			break;
		default://depart
			e.setDepartName(array[1]);
			e.setFlag(2);
			break;
		}
		e.setDepartId(array[2]);
		e.setDepartNo(array[3]);
		key.setDepartId(e.getDepartId());
		key.setDepartNo(e.getDepartNo());
		context.write(key, e);
	}
}
 

reducer:

public class MyJoinReducer extends Reducer<MultiKey, Employee, IntWritable, Text>{

	List<emp> empList = new LinkedList<emp>();
	List<depart> departList = new LinkedList<MyJoinReducer.depart>();
	
	@Override
	public void reduce(MultiKey key,Iterable<Employee> values,Context context) throws IOException,InterruptedException{
		for (Employee employee : values) {
			visite(employee);
		}
		System.out.println("----------");
		System.out.println(key);
		for (emp em : empList) {
			for (depart de : departList) {
				Employee e = new Employee();
				e.setDepartId(em.departId);
				e.setDepartName(de.departName);
				e.setDepartNo(em.departNo);
				e.setEmpName(em.empName);
				context.write(new IntWritable(1), new Text(e.toString()));
			}
		}

		empList = new LinkedList<emp>();
		departList = new LinkedList<MyJoinReducer.depart>();
	}
	
	private void visite(Employee e){
		switch (e.getFlag()) {
		case 1:
			emp em = new emp();
			em.departId = e.getDepartId();
			em.departNo = e.getDepartName();
			em.empName = e.getEmpName();
			empList.add(em);
			break;

		default:
			depart de = new depart();
			de.departName = e.getDepartName();
			departList.add(de);
			break;
		}
	}
	
	
	private class emp{
		public String empName;
		public String departId;
		public String departNo;
	}
	
	private class depart{
		public String departName;
	}
	
}

comparator

public class MyJoinComparator extends WritableComparator{

	protected MyJoinComparator() {
		super(MultiKey.class,true);
	}
	
}
 

groupcomparator:

public class MyJoinGroupComparator implements RawComparator<MultiKey> {

	private DataInputBuffer buffer = new DataInputBuffer();

	@Override
	public int compare(MultiKey key1, MultiKey key2) {
		return key1.compareTo(key2);
	}

	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		return new MyJoinComparator().compare(b1, s1, l1, b2, s2, l2);
	}

}
 

补个测试类

public class MyJoinTest {
	
	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		upload("dirk1.txt", "dirk.txt");
		upload("dirk2.txt","dirk2.txt");
		delete();
		Configuration conf = new Configuration();
		Job job = new Job(conf, "joinJob");
		job.setMapperClass(MyJoinMapper.class);
		job.setReducerClass(MyJoinReducer.class);
		job.setMapOutputKeyClass(MultiKey.class);
		job.setMapOutputValueClass(Employee.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);
		job.setGroupingComparatorClass(MyJoinGroupComparator.class);
		FileInputFormat.addInputPath(job, new Path("/user/dirk3/input"));
		FileOutputFormat.setOutputPath(job, new Path("/user/dirk3/output"));
		job.waitForCompletion(true);
	}
	
	public static void upload(String local,String remote) throws IOException{
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		String l = MyJoinTest.class.getResource("").getPath()+"/"+local;
		fs.copyFromLocalFile(false, true, new Path(l), new Path("/user/dirk3/input/"+remote));
	}
	
	public static void delete() throws IOException {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path("/user/dirk3/output"), true);
	}
	
	public static void run(){
		JobConf jobConf = new JobConf();
		jobConf.setOutputKeyComparatorClass(MyJoinComparator.class);
		jobConf.setOutputValueGroupingComparator(MyJoinComparator.class);
	}

}
 

join的主要实现在reducer中

关于comparator,在通过maper向context中添加key value后,通过combine,partition之后,进入reducer阶段,进行groupComparator,决定哪些key同时进入一个reducer

相关推荐