hadoop 自定义inputformat和outputformat

hadoop的inputformat和outputformat

再贴一个项目中,在实现hadoop join时,用的inputformat和outputformat的简单实例:

   自定义inputformat(泛型是maper的input)

public class MyInputFormat extends FileInputFormat<MultiKey,Employee> {
	
	public MyInputFormat(){}

	@Override
	public RecordReader<MultiKey, Employee> createRecordReader(
			InputSplit split, TaskAttemptContext context) throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub
		return new MyRecordReader();
	}
	
	public static class MyRecordReader extends RecordReader<MultiKey, Employee>{

		public LineReader in;
		public MultiKey key;
		public Employee value;
		public StringTokenizer token = null;
		
		public Text line;
		
		@Override
		public void initialize(InputSplit split, TaskAttemptContext context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			FileSplit fileSplit = (FileSplit)split;
			Configuration job = context.getConfiguration();
			Path file = fileSplit.getPath();
			FileSystem fs = file.getFileSystem(job);
			
			FSDataInputStream filein = fs.open(file);
			in = new LineReader(filein, job);
			
			key = new MultiKey();
			value = new Employee();
			line = new Text();
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {

			int linesize = in.readLine(line);
			if(linesize==0)
				return false;
			String[] pieces = line.toString().split(",");
			int i = Integer.valueOf(pieces[0]);
			switch (i) {
			case 1:
				value.setEmpName(pieces[1]);
				value.setFlag(1);
				break;

			default:
				value.setDepartName(pieces[1]);
				value.setFlag(2);
				break;
			}
			value.setDepartId(pieces[2]);
			value.setDepartNo(pieces[3]);
			
			key.setDepartId(value.getDepartId());
			key.setDepartNo(value.getDepartNo());
			return true;
		}

		@Override
		public MultiKey getCurrentKey() throws IOException,
				InterruptedException {
			// TODO Auto-generated method stub
			return key;
		}

		@Override
		public Employee getCurrentValue() throws IOException,
				InterruptedException {
			// TODO Auto-generated method stub
			return value;
		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return 0;
		}

		@Override
		public void close() throws IOException {
			// TODO Auto-generated method stub
			
		}
		
	}

}

   自定义outputformat(泛型是reduce的输出)

public class MyOutputFormat extends FileOutputFormat<Text, Employee> {

	@Override
	public RecordWriter<Text, Employee> getRecordWriter(
			TaskAttemptContext job) throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		Configuration conf = job.getConfiguration();
		Path file = getDefaultWorkFile(job, "");
		FileSystem fs = file.getFileSystem(conf);
		FSDataOutputStream fileOut = fs.create(file, false);
		return new MyRecordWriter(fileOut);
	}
	
	public static class MyRecordWriter extends RecordWriter<Text, Employee>{

		protected DataOutputStream out;
		private final byte[] keyValueSeparator;
		 public static final String NEW_LINE = System.getProperty("line.separator");
		
		public MyRecordWriter(DataOutputStream out){
			this(out,":");
		}
		
		public MyRecordWriter(DataOutputStream out,String keyValueSeparator){
			this.out = out;
			this.keyValueSeparator = keyValueSeparator.getBytes();
		}
		
		@Override
		public void write(Text key, Employee value) throws IOException,
				InterruptedException {
			if(key!=null){
				out.write(key.toString().getBytes());
				out.write(keyValueSeparator);
			}
			out.write(value.toString().getBytes());
			out.write(NEW_LINE.getBytes());
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException,
				InterruptedException {
			out.close();
		}
		
	}

}

相关推荐