Hadoop两列数据的排序

原数据形式入下

1 2
2 4
2 3
2 1
3 1
3 4
4 1
4 4
4 3
1 1


要求按照第一列的顺序排序,如果第一列相等,那么按照第二列排序

如果利用mapreduce过程的自动排序,只能实现根据第一列排序,现在需要自定义一个继承自WritableComparable接口的类,用该类作为key,就可以利用mapreduce过程的自动排序了。代码如下:

package mapReduce;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;


import mapReduce.SortApp1.MyMapper;
import mapReduce.SortApp1.MyReducer;
import mapReduce.SortApp1.NewK2;


import org.apache.Hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class SortApp {
private static String INPUT_PATH = "hdfs://hadoop1:9000/data";
private static String OUT_PATH = "hdfs://hadoop1:9000/dataOut";


public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
Path outputDir = new Path(OUT_PATH);
if (fileSystem.exists(outputDir)) {
fileSystem.delete(outputDir, true);
}


Job job = new Job(conf, "data");


FileInputFormat.setInputPaths(job, INPUT_PATH);


job.setInputFormatClass(TextInputFormat.class);

job.setMapOutputKeyClass(KeyValue.class);
job.setMapOutputValueClass(LongWritable.class);

job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.waitForCompletion(true);

 

 

}


static class MyMapper extends
Mapper<LongWritable, Text, KeyValue, LongWritable> {
@Override
protected void map(LongWritable k1, Text value, Context context)
throws IOException, InterruptedException {
final String[] splited = value.toString().split("\t");
final KeyValue k2 = new KeyValue(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
context.write(k2, v2);
}
}


static class MyReducer extends Reducer<KeyValue, LongWritable, LongWritable, LongWritable> {
protected void reduce(KeyValue k2,java.lang.Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException {
context.write(new LongWritable(k2.first), new LongWritable(k2.second));
}
}
static class KeyValue implements WritableComparable<KeyValue>{
Long first;
Long second;

public KeyValue(){}

public KeyValue(long first, long second){
this.first = first;
this.second = second;
}

 

@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readLong();
this.second = in.readLong();
}


@Override
public void write(DataOutput out) throws IOException {
out.writeLong(first);
out.writeLong(second);
}


@Override
public int compareTo(KeyValue o) {
final long minus = this.first - o.first;
if(minus != 0){
return (int)minus;
}
return (int)(this.second - o.second);
}

public int hashCode() {
return this.first.hashCode()+this.second.hashCode();
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof KeyValue)){
return false;
}
KeyValue kv = (KeyValue)obj;
return (this.first == kv.first)&& (this.second == kv.second);
}

public boolean equals(Object obj) {
if(!(obj instanceof NewK2)){
return false;
}
NewK2 oK2 = (NewK2)obj;
return (this.first==oK2.first)&&(this.second==oK2.second);
}
}
}

KeyValue 中的first second属性必须写成Long类型,而不是long,否则 this.first.hashCode()不成立。对任何实现WritableComparable的类都能进行排序,这可以一些复杂的数据,只要把他们封装成实现了WritableComparable的类作为key就可以了

相关阅读

相关推荐