hadoop学习之路_1、自定义序列化类
一、Hadoop序列化
1、序列化(Serialization)是指把结构化对象转化为字节流。
2、反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
3、Java序列化(java.io.Serializable)
二、序列化格式特点:
1、紧凑:高效使用存储空间。
2、快速:读写数据的额外开销小
3、可扩展:可透明地读取老格式的数据
4、互操作:支持多语言的交互
三、Hadoop的序列化格式:Writable
四、Hadoop序列化的作用
1、序列化在分布式环境的两大作用:进程间通信,永久存储。
2、Hadoop节点间通信。
五、使用hadoop内置的序列化类(不使用自定义序列化类),实现流量统计的功能。
public class TrafficApp1 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration(), TrafficApp1.class.getSimpleName()); job.setJarByClass(TrafficApp1.class); FileInputFormat.setInputPaths(job, args[0]); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{ Text k2 = new Text(); Text v2 = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] splited = line.split("\t"); k2.set(splited[1]); //将得到的数据拼接成String字符串,用于reduce输入使用 v2.set(splited[6]+"\t"+splited[7]+"\t"+ splited[8]+"\t"+ splited[9]); context.write(k2, v2); } } public static class MyReduce extends Reducer<Text, Text, Text, Text>{ Text v3 = new Text(); @Override protected void reduce(Text k2, Iterable<Text> v2s, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { long t1 = 0L; long t2 = 0L; long t3 = 0L; long t4 = 0L; String[] splited = null; for(Text v2 : v2s){ //将map输入的字符串分割解析并计算 splited = v2.toString().split("\t"); t1 += Long.parseLong(splited[0]); t2 += Long.parseLong(splited[1]); t3 += Long.parseLong(splited[2]); t4 += Long.parseLong(splited[3]); } //输出格式化的字符串 v3.set(t1+"\t"+t2+"\t"+t3+"\t"+t4); context.write(k2, v3); } } }
六、自定义序列化类
public class TrafficApp { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration(), TrafficApp.class.getSimpleName()); job.setJarByClass(TrafficApp.class); FileInputFormat.setInputPaths(job, args[0]); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TrafficWritable.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TrafficWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, Text, TrafficWritable>{ Text k2 = new Text(); TrafficWritable v2 = new TrafficWritable(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TrafficWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] splited = line.split("\t"); k2.set(splited[1]); v2.set(splited[6], splited[7], splited[8], splited[9]); context.write(k2, v2); } } public static class MyReduce extends Reducer<Text, TrafficWritable, Text, TrafficWritable>{ TrafficWritable v3 = new TrafficWritable(); @Override protected void reduce(Text k2, Iterable<TrafficWritable> v2s, Reducer<Text, TrafficWritable, Text, TrafficWritable>.Context context) throws IOException, InterruptedException { long t1 = 0L; long t2 = 0L; long t3 = 0L; long t4 = 0L; for(TrafficWritable v2 : v2s){ t1 += v2.t1; t2 += v2.t2; t3 += v2.t3; t4 += v2.t4; } v3.set(t1, t2, t3, t4); context.write(k2, v3); } } static class TrafficWritable implements Writable{ long t1; long t2; long t3; long t4; public TrafficWritable(){} public void set(long t1,long t2,long t3,long t4){ this.t1 = t1; this.t2 = t2; this.t3 = t3; this.t4 = t4; } public void set(String t1,String t2,String t3, String t4){ this.t1 = Long.parseLong(t1); this.t2 = Long.parseLong(t2); this.t3 = Long.parseLong(t3); this.t4 = Long.parseLong(t4); } public void readFields(DataInput in) throws IOException { this.t1 = in.readLong(); this.t2 = in.readLong(); this.t3 = in.readLong(); this.t4 = in.readLong(); } public void write(DataOutput out) throws IOException { out.writeLong(t1); out.writeLong(t2); out.writeLong(t3); out.writeLong(t4); } @Override public String toString() { return this.t1+"\t"+t2+"\t"+t3+"\t"+t4+"\t"; } } }
相关推荐
changjiang 2020-11-16
minerd 2020-10-28
WeiHHH 2020-09-23
Aleks 2020-08-19
WeiHHH 2020-08-17
飞鸿踏雪0 2020-07-26
tomli 2020-07-26
deyu 2020-07-21
strongyoung 2020-07-19
eternityzzy 2020-07-19
Elmo 2020-07-19
飞鸿踏雪0 2020-07-09
飞鸿踏雪0 2020-07-04
xieting 2020-07-04
WeiHHH 2020-06-28
genshengxiao 2020-06-26
Hhanwen 2020-06-25