Hadoop The Definitive Guide 2nd Edition 读书笔记3
第四章是介绍Hadoop的IO系统,前面介绍的是数据完整性的方案,无外乎就是校验和等机制,接着介绍的是hadoop中的压缩机制,这两块内容后续仔细阅读。
串行化:
Hadoop的进程间通信和持久化保存的时候都要用到串行化的技术。首先看一下Hadoop的进程间通信的机制。Hadoop中进程间通信时通过自己的一套RPC机制来实现的,节点间的通信格式由protocal来决定。
Hadoop中并没有使用Java的串行化机制,而使使用自己的一套技术。
所有可串行化类的都继承自Writable接口。hadoop提供了很多可串行化的类型,如IntWritable,Text,DoubleWritable等。每个类中都有一个内部类Comparator。在MapReduce过程中排序是很重要的操作,而排序的比较方法就是有这个内部类提供的。WritableComparator是Comparator内部类的基类,它提供了默认的比较方法的实现和提供一个工厂方法,可以根据不同的类型返回不同的WritableComparator实现类。
我们看一下WritableComparator的compare函数实现:
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { buffer.reset(b1, s1, l1); // parse key1 key1.readFields(buffer); buffer.reset(b2, s2, l2); // parse key2 key2.readFields(buffer); } catch (IOException e) { throw new RuntimeException(e); } return compare(key1, key2); // compare them }
可见默认的compare(byte[]b1,ints1,intl1,byte[]b2,ints2,intl2)操作是先将流中的数据反序列化,然后调用compare(WritableComparablea,WritableComparableb)函数,compare(WritableComparablea,WritableComparableb)调用对象的compareTo函数进行比较。
再看一下IntWritable中的Comparator内部类,它复写了ompare(byte[]b1,ints1,intl1,byte[]b2,ints2,intl2)方法:
public static class Comparator extends WritableComparator { public Comparator() { super(IntWritable.class); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int thisValue = readInt(b1, s1); int thatValue = readInt(b2, s2); return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1)); } }
可见compare并没有反序列化,而是直接以字节的形式进行比较,从而提高了比较操作的效率。
类型VIntWritable:
VIntWritable是变长的可序列化整形,他的原理是用第一个字节的第4位(从右向左数)来表示这个数的正负,0为负1为正(注意跟正常的编码不一样),用后三位来表示这个数占几个字节(算上第一个字节)。10001111-10001000,从右数第四位是1,则表示这个数是正的,后三位111-000分别表示2-9.;再看10000111-10000000,从右数第四位是0,表示负数,后三位111-000分别表示2-9个字节。
比如书上的例子163,00000000000000000000000010100011,算上第一个字节,二进制至少需要两个字节存放,所以序列化写入流的时候第一个字节为8f(10001111)——第四位为1,表示正数,前三位111表示2个字节。所以163在内存中用两个字节就可以存下:8fa3。具体操作可以参见源码。
类型Text:
Hadoop中的Text是UTF-8编码,这跟Java的String不一样(Java的String是unicode编码)。而且Text会用一个VIntWritable来记录整个Text有多少个字节(不算这个记录的VIntWritable的长度)。
类型ByteWritable:
ByteWritable在序列化的时候会用4个字节表示后面数据占几个字节,比如byte[]{2,3}在byteWritable中存储形式是:000000020203。
实现自己的类型:
由于Writable是MapReduce过程中的核心类型,所以有的时候我们要实现自己的类型,下面用书中的例子来说明一下:
package com.test.hadoop3; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; public class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getSecond() { return second; } public void setSecond(Text second) { this.second = second; } @Override public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } @Override public int compareTo(TextPair o) { int cmp = first.compareTo(o.first); if (cmp != 0) return cmp; return second.compareTo(o.second); } @Override public boolean equals(Object obj) { if (obj instanceof TextPair) { TextPair tp = (TextPair) obj; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public String toString() { return first + "\t" + second; } }
我们实现的TextPair类型有两个Text组成,readFields和write是序列化相关,所以必须实现。hashCode方法在HashPartitioner做分区的时候会用到,而toString函数,在MapReduce利用TextInputFormat进行输出时,会调用key和value的toString进行输出,所以必须正确的是实现。TextPair实现了WritableComparable接口,所以必须实现compareTo方法。
对于我们刚刚实现的类型,我们并没有提供其他类型中含有的Comparator内部类,所以默认的比较行为是反序列化成对象,然后调用对象的compareTo方法进行比较。我们可以利用Comparator内部类对这个比较过程进行优化,优化的方法是在比较过程中不反序列化对象,而是直接以二进制进行比较。
private static class Comparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public Comparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { // 计算first有多少个字节,Text会有一个VIntWritable来记录first总共有多少个字节,所以长度是这个VIntWritable的长度+first总共有多少个字节 int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); if (cmp != 0) { return cmp; } return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } static { WritableComparator.define(TextPair.class, new Comparator()); }