hadoop 自定义inputformat和outputformat
hadoop的inputformat和outputformat
再贴一个项目中,在实现hadoop join时,用的inputformat和outputformat的简单实例:
自定义inputformat(泛型是maper的input)
public class MyInputFormat extends FileInputFormat<MultiKey,Employee> { public MyInputFormat(){} @Override public RecordReader<MultiKey, Employee> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new MyRecordReader(); } public static class MyRecordReader extends RecordReader<MultiKey, Employee>{ public LineReader in; public MultiKey key; public Employee value; public StringTokenizer token = null; public Text line; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub FileSplit fileSplit = (FileSplit)split; Configuration job = context.getConfiguration(); Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(job); FSDataInputStream filein = fs.open(file); in = new LineReader(filein, job); key = new MultiKey(); value = new Employee(); line = new Text(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { int linesize = in.readLine(line); if(linesize==0) return false; String[] pieces = line.toString().split(","); int i = Integer.valueOf(pieces[0]); switch (i) { case 1: value.setEmpName(pieces[1]); value.setFlag(1); break; default: value.setDepartName(pieces[1]); value.setFlag(2); break; } value.setDepartId(pieces[2]); value.setDepartNo(pieces[3]); key.setDepartId(value.getDepartId()); key.setDepartNo(value.getDepartNo()); return true; } @Override public MultiKey getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return key; } @Override public Employee getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return value; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void close() throws IOException { // TODO Auto-generated method stub } } }
自定义outputformat(泛型是reduce的输出)
public class MyOutputFormat extends FileOutputFormat<Text, Employee> { @Override public RecordWriter<Text, Employee> getRecordWriter( TaskAttemptContext job) throws IOException, InterruptedException { // TODO Auto-generated method stub Configuration conf = job.getConfiguration(); Path file = getDefaultWorkFile(job, ""); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(file, false); return new MyRecordWriter(fileOut); } public static class MyRecordWriter extends RecordWriter<Text, Employee>{ protected DataOutputStream out; private final byte[] keyValueSeparator; public static final String NEW_LINE = System.getProperty("line.separator"); public MyRecordWriter(DataOutputStream out){ this(out,":"); } public MyRecordWriter(DataOutputStream out,String keyValueSeparator){ this.out = out; this.keyValueSeparator = keyValueSeparator.getBytes(); } @Override public void write(Text key, Employee value) throws IOException, InterruptedException { if(key!=null){ out.write(key.toString().getBytes()); out.write(keyValueSeparator); } out.write(value.toString().getBytes()); out.write(NEW_LINE.getBytes()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { out.close(); } } }
相关推荐
changjiang 2020-11-16
minerd 2020-10-28
WeiHHH 2020-09-23
Aleks 2020-08-19
WeiHHH 2020-08-17
飞鸿踏雪0 2020-07-26
tomli 2020-07-26
deyu 2020-07-21
strongyoung 2020-07-19
eternityzzy 2020-07-19
Elmo 2020-07-19
飞鸿踏雪0 2020-07-09
飞鸿踏雪0 2020-07-04
xieting 2020-07-04
WeiHHH 2020-06-28
genshengxiao 2020-06-26
Hhanwen 2020-06-25