Hadoop The Definitive Guide 2nd Edition 读书笔记3

第四章是介绍Hadoop的IO系统,前面介绍的是数据完整性的方案,无外乎就是校验和等机制,接着介绍的是hadoop中的压缩机制,这两块内容后续仔细阅读。

串行化:

Hadoop的进程间通信和持久化保存的时候都要用到串行化的技术。首先看一下Hadoop的进程间通信的机制。Hadoop中进程间通信时通过自己的一套RPC机制来实现的,节点间的通信格式由protocal来决定。

Hadoop中并没有使用Java的串行化机制,而使使用自己的一套技术。

所有可串行化类的都继承自Writable接口。hadoop提供了很多可串行化的类型,如IntWritable,Text,DoubleWritable等。每个类中都有一个内部类Comparator。在MapReduce过程中排序是很重要的操作,而排序的比较方法就是有这个内部类提供的。WritableComparator是Comparator内部类的基类,它提供了默认的比较方法的实现和提供一个工厂方法,可以根据不同的类型返回不同的WritableComparator实现类。

我们看一下WritableComparator的compare函数实现:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    try {
      buffer.reset(b1, s1, l1);                   // parse key1
      key1.readFields(buffer);
      
      buffer.reset(b2, s2, l2);                   // parse key2
      key2.readFields(buffer);
      
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    
    return compare(key1, key2);                   // compare them
  }

可见默认的compare(byte[]b1,ints1,intl1,byte[]b2,ints2,intl2)操作是先将流中的数据反序列化,然后调用compare(WritableComparablea,WritableComparableb)函数,compare(WritableComparablea,WritableComparableb)调用对象的compareTo函数进行比较。

再看一下IntWritable中的Comparator内部类,它复写了ompare(byte[]b1,ints1,intl1,byte[]b2,ints2,intl2)方法:

public static class Comparator extends WritableComparator {
    public Comparator() {
      super(IntWritable.class);
    }

    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      int thisValue = readInt(b1, s1);
      int thatValue = readInt(b2, s2);
      return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
    }
  }

可见compare并没有反序列化,而是直接以字节的形式进行比较,从而提高了比较操作的效率。

类型VIntWritable:

VIntWritable是变长的可序列化整形,他的原理是用第一个字节的第4位(从右向左数)来表示这个数的正负,0为负1为正(注意跟正常的编码不一样),用后三位来表示这个数占几个字节(算上第一个字节)。10001111-10001000,从右数第四位是1,则表示这个数是正的,后三位111-000分别表示2-9.;再看10000111-10000000,从右数第四位是0,表示负数,后三位111-000分别表示2-9个字节。

比如书上的例子163,00000000000000000000000010100011,算上第一个字节,二进制至少需要两个字节存放,所以序列化写入流的时候第一个字节为8f(10001111)——第四位为1,表示正数,前三位111表示2个字节。所以163在内存中用两个字节就可以存下:8fa3。具体操作可以参见源码。

类型Text:

Hadoop中的Text是UTF-8编码,这跟Java的String不一样(Java的String是unicode编码)。而且Text会用一个VIntWritable来记录整个Text有多少个字节(不算这个记录的VIntWritable的长度)。

类型ByteWritable:

ByteWritable在序列化的时候会用4个字节表示后面数据占几个字节,比如byte[]{2,3}在byteWritable中存储形式是:000000020203。

实现自己的类型:

由于Writable是MapReduce过程中的核心类型,所以有的时候我们要实现自己的类型,下面用书中的例子来说明一下:

package com.test.hadoop3;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class TextPair implements WritableComparable<TextPair> {

	private Text first;
	private Text second;
	
	public TextPair() {
		set(new Text(), new Text());
	}
	
	public TextPair(String first, String second) {
		set(new Text(first), new Text(second));
	}
	
	public TextPair(Text first, Text second) {
		set(first, second);
	}
	
	public void set(Text first, Text second) {
		this.first = first;
		this.second = second;
	} 
	
	public Text getSecond() {
		return second;
	}

	public void setSecond(Text second) {
		this.second = second;
	}
	
	@Override
	public void readFields(DataInput in) throws IOException {
		first.readFields(in);
		second.readFields(in);
	}

	@Override
	public void write(DataOutput out) throws IOException {
		first.write(out);
		second.write(out);
	}

	@Override
	public int compareTo(TextPair o) {
		int cmp = first.compareTo(o.first);
		if (cmp != 0)
			return cmp;
		return second.compareTo(o.second);
	}

	@Override
	public boolean equals(Object obj) {
		if (obj instanceof TextPair) {
			TextPair tp = (TextPair) obj;
			return first.equals(tp.first) && second.equals(tp.second);
		}
		return false;
	}

	@Override
	public int hashCode() {
		return first.hashCode() * 163 + second.hashCode();
	}

	@Override
	public String toString() {
		return first + "\t" + second;
	}
}

我们实现的TextPair类型有两个Text组成,readFields和write是序列化相关,所以必须实现。hashCode方法在HashPartitioner做分区的时候会用到,而toString函数,在MapReduce利用TextInputFormat进行输出时,会调用key和value的toString进行输出,所以必须正确的是实现。TextPair实现了WritableComparable接口,所以必须实现compareTo方法。

对于我们刚刚实现的类型,我们并没有提供其他类型中含有的Comparator内部类,所以默认的比较行为是反序列化成对象,然后调用对象的compareTo方法进行比较。我们可以利用Comparator内部类对这个比较过程进行优化,优化的方法是在比较过程中不反序列化对象,而是直接以二进制进行比较。

private static class Comparator extends WritableComparator {

		private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
		
		public Comparator() {
			super(TextPair.class);
		}

		@Override
		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			try {
				// 计算first有多少个字节,Text会有一个VIntWritable来记录first总共有多少个字节,所以长度是这个VIntWritable的长度+first总共有多少个字节
				int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
				int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
				int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
				if (cmp != 0) {
					return cmp;
				}
				return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2);
			} catch (IOException e) {
				throw new IllegalArgumentException(e);
			}
		}
		
		static {
			WritableComparator.define(TextPair.class, new Comparator());
		}

相关推荐