hadoop学习之路_1、自定义序列化类

一、Hadoop序列化

1、序列化(Serialization)是指把结构化对象转化为字节流。

2、反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。

3、Java序列化(java.io.Serializable)

二、序列化格式特点:

1、紧凑:高效使用存储空间。

2、快速:读写数据的额外开销小

3、可扩展:可透明地读取老格式的数据

4、互操作:支持多语言的交互

三、Hadoop的序列化格式:Writable

四、Hadoop序列化的作用

1、序列化在分布式环境的两大作用:进程间通信,永久存储。

2、Hadoop节点间通信。

五、使用hadoop内置的序列化类(不使用自定义序列化类),实现流量统计的功能。

public class TrafficApp1 {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Job job = Job.getInstance(new Configuration(), TrafficApp1.class.getSimpleName());
		job.setJarByClass(TrafficApp1.class);
		FileInputFormat.setInputPaths(job, args[0]);
		
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		
		job.setReducerClass(MyReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
		
	}
	
	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
		
		Text k2 = new Text();
		Text v2 = new Text();
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, Text>.Context context)
						throws IOException, InterruptedException {
			String line = value.toString();
			String[] splited = line.split("\t");
			k2.set(splited[1]);
			//将得到的数据拼接成String字符串,用于reduce输入使用
			v2.set(splited[6]+"\t"+splited[7]+"\t"+ splited[8]+"\t"+ splited[9]);
			context.write(k2, v2);
			
		}
	}
	
	public static class MyReduce extends Reducer<Text, Text, Text, Text>{
		Text v3 = new Text();
		@Override
		protected void reduce(Text k2, Iterable<Text> v2s,
				Reducer<Text, Text, Text, Text>.Context context)
						throws IOException, InterruptedException {
			long t1 = 0L;
			long t2 = 0L;
			long t3 = 0L;
			long t4 = 0L;
			String[] splited = null;
			for(Text v2 : v2s){
				//将map输入的字符串分割解析并计算
				splited = v2.toString().split("\t");
				t1 += Long.parseLong(splited[0]);
				t2 += Long.parseLong(splited[1]);
				t3 += Long.parseLong(splited[2]);
				t4 += Long.parseLong(splited[3]);
			}
			//输出格式化的字符串
			v3.set(t1+"\t"+t2+"\t"+t3+"\t"+t4);
			context.write(k2, v3);
		}
		
	}
	
}

六、自定义序列化类

public class TrafficApp {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Job job = Job.getInstance(new Configuration(), TrafficApp.class.getSimpleName());
		job.setJarByClass(TrafficApp.class);
		
		FileInputFormat.setInputPaths(job, args[0]);
		
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(TrafficWritable.class);
		
		
		job.setReducerClass(MyReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TrafficWritable.class);
		
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
		
	}
	
	public static class MyMapper extends Mapper<LongWritable, Text, Text, TrafficWritable>{
		
		Text k2 = new Text();
		TrafficWritable v2 = new TrafficWritable();
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, TrafficWritable>.Context context)
						throws IOException, InterruptedException {
			String line = value.toString();
			String[] splited = line.split("\t");
			k2.set(splited[1]);
			v2.set(splited[6], splited[7], splited[8], splited[9]);
			
			context.write(k2, v2);
			
		}
	}
	
	
	public static class MyReduce extends Reducer<Text, TrafficWritable, Text, TrafficWritable>{
		TrafficWritable v3 = new TrafficWritable();
		@Override
		protected void reduce(Text k2, Iterable<TrafficWritable> v2s,
				Reducer<Text, TrafficWritable, Text, TrafficWritable>.Context context)
						throws IOException, InterruptedException {
			long t1 = 0L;
			long t2 = 0L;
			long t3 = 0L;
			long t4 = 0L;
			for(TrafficWritable v2 : v2s){
				t1 += v2.t1;
				t2 += v2.t2;
				t3 += v2.t3;
				t4 += v2.t4;
			}
			
			v3.set(t1, t2, t3, t4);
			context.write(k2, v3);
			
		}
		
	}
	
	static class TrafficWritable implements Writable{

		long t1;
		long t2;
		long t3;
		long t4;
		
		public TrafficWritable(){}
		
		public void set(long t1,long t2,long t3,long t4){
			this.t1 = t1;
			this.t2 = t2;
			this.t3 = t3;
			this.t4 = t4;
		}
		
		public void set(String t1,String t2,String t3, String t4){
			this.t1 = Long.parseLong(t1);
			this.t2 = Long.parseLong(t2);
			this.t3 = Long.parseLong(t3);
			this.t4 = Long.parseLong(t4);
		}
		
		public void readFields(DataInput in) throws IOException {
			this.t1 = in.readLong();
			this.t2 = in.readLong();
			this.t3 = in.readLong();
			this.t4 = in.readLong();
			
		}

		public void write(DataOutput out) throws IOException {

			out.writeLong(t1);
			out.writeLong(t2);
			out.writeLong(t3);
			out.writeLong(t4);
		}
		
		@Override
		public String toString() {
			return this.t1+"\t"+t2+"\t"+t3+"\t"+t4+"\t";
		}
		
	} 
}

相关推荐