Hadoop实战-初级部分 之 Hadoop IO

第一部分:数据完整性
数据完整性及其采用的技术
保证数据在传输过程中不损坏 ,常见的保证数据完整性采用的技术
A.奇偶校验技术
B.ECC校验纠错技术
C.CRC-32循环冗余校验技术
          HDFS以透明方式校验所有写入它的数据,并在默认设置下,会在读取数据时验证校验和。针对数据的每个io.bytes.per.checksum(默认512字节)字节,都会创建一个单独的校验和。
             数据节点负责在存储数据及其校验和之前验证它们收到的数据。 从客户端和其它数据节点复制过来的数据。客户端写入数据并且将它发送到一个数据节点管线中,在管线的最后一个数据节点验证校验和。
              客户端读取数据节点上的数据时,会验证校验和,将其与数据节点上存储的校验和进行对比。每个数据节点维护一个连续的校验和验证日志,因此它知道每个数据块最后验证的时间。每个数据节点还会在后台线程运行一个DataBlockScanner(数据块检测程序),定期验证存储在数据节点上的所有块,为了防止物理存储介质中位衰减锁造成的数据损坏。
          HDFS通过复制完整的副本来产生一个新的,无错的副本来“治愈”哪些出错的数据块。工作方式:如果客户端读取数据块时检测到错误,抛出Checksum Exception前报告该坏块以及它试图从名称节点中药读取的数据节点。名称节点将这个块标记为损坏的,不会直接复制给客户端或复制该副本到另一个数据 节点。它会从其他副本复制一个新的副本。
本地文件系统
            Hadoop的本地文件系统执行客户端校验。意味着,在写一个名filename的文件时,文件系统的客户端以透明的方式创建一个隐藏.filename.crc。在同一个文件夹下,包含每个文件块的校验和。       
 
         数据块大小由io.bytes.per.checksum属性控制,块的大小作为元数据存储在.crc文件中。也可能禁用校验和:底层文件系统原生支持校验和。这里通过 RawLocalFileSystem来替代LocalFileSystem完成。要在一个应用中全局使用,只需要设置fs.file.impl值为 org.apache.hadoop.fs.RawLocalFileSystem来重新map执行文件的URL。或者只想对某些读取禁用校验和校验。例子:
Configuration conf = ...
FileSystem fs = new RawLocalFileSystem();
fs.initialize(null, conf);
ChecksumFileSystem
LocalFileSystem使用ChecksumFileSystem(校验和文件系统)为自己工作,这个类可以很容易添加校验和功能到其他文件系统中。因为ChecksumFileSystem也包含于文件系统中。
第二部分:压缩
编码/解码器:用以执行压缩解压算法。
•DEFLATE   org.apache.hadoop.io.compress.DefaultCodec
•gzip   org.apache.hadoop.io.compress.GzipCodec
•bzip2  org.apache.hadoop.io.compress.Bzip2Codec
•LZO  com.hadoop.compression.lzo.LzopCodec
•CompressionCodec 对流进行进行压缩与解压缩
•CompressionCodecFactory 方法来推断CompressionCodec
Hadoop支持的压缩形式
压缩格式工具算法  文件扩展名  多文件可分割性
 DEFLATE 无 DEFLATE .deflate 不 不
 gzip gzip DEFLATE .gz 不 不
 bzip2 bzip2 bzip2 .bz2 不 是
 LZO lzop LZO .lzo 不 不
 
•属性名:
      io.compression.codecs
      默认值: org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.ompress.Bzip2Codec
•本地库
压缩格式Java 实现本地实现
DEFLATE
Gzip
Bzip2
 
LZO
 
压缩与输入分割
•前提:
          在考虑如何压缩那些将由MapReduce处理的数据时,考虑压缩格式是否支持分割是很重要的。
• 案例
•假设,一个文件时一个gzip格式的压缩文件,压缩后的大小为1GB。HDFS将其分为16块。然而针对每一块在进行分块是不可以的,因为gzip合适的文件不支持分割(分块)机制,所以读取他的MapReduce不分割文件,造成了只有Map读取16块文件的情况。导致运行时间变长。
•应该选择哪种压缩形式
•总体原则,还要经过测试,才可以决定。
•经验:大文件选择支持分割的压缩形式
 
在 MR 中使用压缩
•前提:
         如果文件是压缩过的,那么在被MapReduce读取时,它们会被解压,根据文件的扩展名来选择应该使用拿一种压缩解码器。
•使用:
•压缩MapReduce的作业输出,在作业配置中将 mapred.output.compress属性设置为true,将mapred.output.compression.codec属性设置为自己需要使用的压缩解码/编码器的类名。
•通过gunzip –c file来查看结果。
Ø代码示例
conf.setBoolean(“mapred.output.compress’,true)
Conf.setClass(“mapred.output.compression.codec”,GizpCodec.class,
CompressionCodec.class);
 
•Map作业输出结果的压缩
•使用原因
          因为Map作业的中间结果会输出到本地,并在网络上传递。所以压缩能获得更好性能,因为传播的数据减少了。
•Map输出压缩属性
•mapred.compress.map.output
•mapred.map.output
•compression.codec
•代码示例
•conf.setCompressMapOutput
•conf.setMapOutputCompressorClass(GzipCodec.classs)
第三部分:序列化
什么是Hadoop的序列化
•序列化(serialization)
          序列化指的是将结构化对象转为字节流以便于通过网络进行传输或写入持久存储的过程。反序列化指的是将字节流转为一系列结构化对象的过程。
          序列化用于:进程间通信与持久存储。
    RPC序列化建议的特性
1.紧凑(Compact)即方便网络传输,充分利用存储空间
2.快速(Fast)即序列化及反序列化性能要好
3.扩展性(Extensible)即协议有变化,可以支持新的需求
4.互操作性(Interoperable)即客户端及服务器端不依赖语言的实现
  Hadoop使用Writables,满足紧凑、快速,不满足扩展能及互操作性
•Hadoop的序列化不是java的序列化,Hadoop自己实现了自己的序列化机制。格式Writables。
•Hadoop中定义了两个序列化相关的接口:Writable接口和Comparable接口,这两个接口可以合成一个接口WritableComparable.
Writable 接口
Writable 接口定义了两个方法:
(1)一个用于将其状态写入二进制格式的 DataOutput 流;
(2)另一个用于从二进制格式的 DataInput 流读取其状态;
我们可以使用 set() 函数来创建和设置 Writable 的值:
IntWritable wirtable = new IntWritable();
writable.set(163);
同样我们也可以使用构造函数:
IntWritable writable = new IntWritable(163);
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
public interface Writable { 
  void write(DataOutput out) throws IOException; 
  void readFields(DataInput in) throws IOException;}
Writable 接口
Writable 接口定义了两个方法:
(1)一个用于将其状态写入二进制格式的 DataOutput 流;
(2)另一个用于从二进制格式的 DataInput 流读取其状态;
我们可以使用 set() 函数来创建和设置 Writable 的值:
IntWritable wirtable = new IntWritable();
writable.set(163);
同样我们也可以使用构造函数:
IntWritable writable = new IntWritable(163);
 
         IntWritable 实现了 WritableComparable 接口 ,后者是 Writable 与java.lang.Comprable 接口的子接口
 
    package org.apache.hadoop.io;
     public interface       WritableComparable<T> extends Writable,Comparable<T> {}
 
 
        Hadoop 优化比对,不需要反序列化即可比较。
package   org.apache.hadoop.io;   
import   java.util.Comparator;   
public   interface  RawComparator<T>  extends   Comparator<T>   {   
public   int  compare( byte [] b1,  int  s1,  int  l1,  byte [] b2,  int  s2, int   l2);   
}   
 
         WritableComparator 是一个 RawComparator 通用的实现 ,为WritableComparable classes. 
  它做了两件事
 1.实现了 compare() 方法(返序列化)
 2.它充当的是 RawComparator 的工厂类
 
 
Hadoop 自带的序列化接口
实现了 WritableComparable 接口的类:
基础: BooleanWritable | ByteWritable
数字: IntWritable | VIntWritable | FloatWritable | LongWritable |VLongWritable | DoubleWritable
高级: NullWritable | Text | BytesWritable | MDSHash | ObjectWritable |GenericWritable
仅实现了 Writable 接口的类:
数组: ArrayWritable | TwoDArrayWritable
映射: AbstractMapWritable | MapWritable | SortedMapWritable
 
•Text
        Text是UTF-8的Writable。可以将它理解为一种与java.lang.String 相类似的Writable。Text类代替了UTF-8类。
         Text是可变的,其值可以通过调用set()方法来改变。最大存储是2GB。
•NullWritable
NullWritable是一种特殊的Writable类型,因为它的序列化的长度是零。可以做占位符。
•BytesWritable  BytesWritable 是一个二进制的数据数组封装。它的序列化格式是一个int字段.
BytesWritable是可变的,其值可以通过调用set()方法来改变。
•ObjectWriable
ObjectWriable 适用于字段可以使用多种类型时。
•Writable集合
     一共 四种:
            ArrayWritable和TwoDArrayWritable是针对数组与二维数组
            MapWritable和SortededMapWritable 针对是Map与SortMap
•实现WritableComparable
•实现
     /** * 将对象转换为字节流并写入到输出流out中 */
      write()
      /** * 从输入流in 中读取字节流并反序列化为对象 */
       readFields(),
      /** * 将this对像与对象O比较*/
      compareTo()方法。
第四部分:基于文件的数据结构
          SequeceFile是Hadoop API提供的一种二进制文件支持。这种二进制文件直接将<key, value>对序列化到文件中。一般对小文件可以使用这种文件合并,即将文件名作为key,文件内容作为value序列化到大文件中 Key是任意的Writable,Value是任意的Writable我们可以实现将许多小文件转化为SequenceFile,以方便Map/Reduce处理实际上,现在Hadoop处理时,都会将数据转为SequenceFile格式,无论是性能还是压缩上的考量。
 
         这种文件格式 有以下好处:
A.支持压缩,且可定制为基于Record或Block压缩(Block级压缩性能较优)
B.本地化任务支持:因为文件可以被切分,因此MapReduce任务时数据的本地化情况应该是非常好的。
C.难度低:因为是Hadoop框架提供的API,业务逻辑侧的修改比较简单。
写 SequenceFile
步骤:
      1. 设置 Configuration
      2. 获取 File System
      3. 设置文件输出路径
      4. SequenceFile.createWriter 创建 SequenceFile.Writer 然后写入
      5. 调用 SequenceFile.Writer .append 追加写入
      6. 关闭流
 
读 SequenceFile
步骤:
      1. 设置 Configuration
      2. 获取 File System
      3. 设置文件输出路径
      4. SequenceFile.Reader 创建读取类 SequenceFile.Reader
      5. 拿到 Key 与 Value 的 class
      6. 读取
 
通过命令行读写 SequenceFile
步骤:
      1. 设置 Configuration
      2. 获取 File System
      3. 设置文件输出路径
      4. SequenceFile.Reader 创建读取类 SequenceFile.Reader
      5. 拿到 Key 与 Value 的 class
      6. 读取
MapFile
           MapFile 是经过排序的带索引的 SequenceFile ,可以根据键值进行查找 .
      由两部分组成,分别是 data 和 index 。 index 作为文件的数据索引,主要记录了每个 Record 的 key 值,以及该 Record 在文件中的偏移位置。在 MapFile 被访问的时候 , 索引 文件会被加载到内存,通过索引映射关系可迅速定位到指定Record 所在文件位置, 因此,相对 SequenceFile 而言, MapFile 的检索效率是高效的,缺点是会消耗一部分 内存来存储 index 数据 .
            需注意的是, MapFile 并不会把所有 Record 都记录到 index 中去,默认情况下每 隔 128 条记录存储一个索引映射。当然,记录间隔可人为修改,通过MapFIle.Writer 的 setIndexInterval() 方法,或修改 io.map.index.interval 属性;
            另外,与 SequenceFile 不同的是, MapFile 的 KeyClass 一定要实现WritableComparable 接口 , 即 Key 值是可比较的。

相关推荐