Hadoop in Action简单笔记(一)

第一部分Hadoop分布式的编程框架

第一章Hadoop简介

1、philosophy:move-code-to-data,适合数据密集性应用。

2、SQLdatabaseVSHadoop:

1)SCALE-OUTINSTEADVSSCALE-UP

2)Key/value对VS关系表:无结构、半结构数据VS结构化的数据

3)函数式编程(MapReduce)VS声明式编程(SQL):hivecanmapthesqltothejob

4)离线批处理VS在线事务处理

3、理解MapReduce

1)2个阶段:

map:转换+过滤数据:<k1,v1>->list(<k2,v2>)

reduce:<k2,list(v2)>->list(<k3,v3>)

map和reduce之间按照key进行group,hadoop负责处理、只需要写map和reduce程序

2)wordcount例子

第二章StartingHadoop

1、BlocksofHadoop:

NameNode:Master,bookkeeperoftheHDFS,keepstrackofhowyourfilesarebrokendownintofileblocks,whichnodesstorethoseblocks,andtheoverallhealthofthedistributedfilesystem

内存、I/O密集型。单点,但SNN可以作为master的备用

DataNode:SlaveoftheHDFS,存储数据的节点、冗余备份、向NameNode报告本地数据的变化。

SecondaryNameNode(SNN):作为master的备用节点、获得NameNode的HDFS元数据的快照、集群的配置

JobTracker:分配提供的job成为多个task,监控各个task,检测各个task的心跳,重启动失败的任务。计算中的master

TaskTracker:负责执行JobTracker分配的单个任务,像JobTracker发送心跳信息。每个DN节点一个TaskTracker,但它可以创建多个jvm实例,并行的处理多个map和reduce的任务。

计算中的slave

2、安装Hadoop

三种模式:Local(standalone)mode、Pseudo-distributedmode、Fullydistributedmode

3、Web-basedclusterUI查看节点和job的信息

第三章Hadoop各个组件

这章主要从程序员的角度介绍了Hadoop的计算框架。

3.1在HDFS文件系统下工作

HDFS是为分布式计算框架设计的大规模的分布式数据处理而设计的。

Hadoopshell提供了很多类似Unix的命令行工具,是HDFS系统的主要接口。

Hadoop也提供了HDFS的编程接口。

3.1.1基本的文件命令

基本形式:hadoopfs-cmd<args>

hadoopfs-ls

hadoopfs-lsr#相当于linux的ls-r

hadoopfs-putexample.txt.#将example.txt从本地文件系统copy到HDFS上。

hadoopfs-getexample.txt.#从HDFS将exampleget到本地

hadoopfs-catexample.txt#相当于linux的cat

hadoopfs-tailexample.txt#linuxtail

可以结合Unix管道:

hadoopfs-catexample.txt|head-n10

hadoopfs-rmexample.txt#linuxrm

查看帮助,比如ls的帮助:

hadoopfs-helpls

可以使用URI来制定精确的文件和目录位置:

hadoopfs-cathdfs://localhost:9000/user/chunk/example.txt

如果处理本地文件系统,那么可以通过配置fs.default.name来配置默认的file://scheme部分。

<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>

这样就可以直接hadoopfs-cat/user/chunk/example.txt

3.1.2编程的方式读写HDFS

Java编程方式操作HDFS,主要在org.apache.hadoop.fs包下面。Hadoop文件操作主要包括:

打开、读、写、关闭,不仅可以操作HDFS,也可以操作本地普通的文件系统。

FileSystem:是文件系统的交互的一个抽象类,有很多具体的子类来处理HDFS和本地文件系统。可以使用:FileSystem.get(Configurationconf)这个工厂来创建期望的实例。

Configuration:只有key/value配置参数的类。默认的配置是基于HDFS系统的资源配置的。

Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);

FileSystem.getLocal(Configurationconf)可以创建一个针对本地的文件系统。

Path:文件和目录的名字

FileStatus:文件和目录的元数据信息

FileSystem local = FileSystem.getLocal(conf);
Path inputDir = new Path(args[0]);
FileStatus[] inputFiles = local.listStatus(inputDir);

FSDataInputStream:

FSDataInputStream in = local.open(inputFiles[i].getPath());
byte buffer[] = new byte[256];
int bytesRead = 0;
while( (bytesRead = in.read(buffer)) > 0 ){
//...
}
in.close();

FSDataInputStream是javaDataInputStream的子类,支持随机访问.

FSDataOutputStream:与FSDataInputStream相对应的输出流:

Path hdfsFile = new Path(args[1]);
FSDataOutputStream out = hdfs.create(hdfsFile);
out.write(buffer,o,bytesRead);
out.close();

3.2一个MapReduce程序剖析

MapReduce数据流:

3.2.1Hadoop数据类型

MapReduce的key,value不能是普通的class,它需要key/value实现序列化的方法,

key还需要具有可比较性。所以MapReduce对基本类型进行了封装。

一般key/value会实现WritableComparable<T>接口,value会Writable接口。

Hadoop预定义了一些对基本类型封装的类型:BooleanWritable,ByteWritable,

DoubleWritable,FloatWritable,IntWritable,LongWritable,Text,NullWritable。

你可以自己定义类型,实现Writable或者WritableComparable接口。

3.2.2Mapper

作为一个Mapper,一般实现了Mapper接口并且继承了MapReduceBase类。MapReduceBase从名字可以看出,作为Mapper和Reducer的基类。

有两个方法作为构造和析构:

voidconfigure(JobConfjob)在数据处理之前调用,加载配置项

voidclose()在map任务结束调用,进行资源回收,比如数据库连接、打开文件关闭。

Mapper接口负责数据处理阶段,他有一个map方法,来处理key/value对:

void map(K1 key, V1 value, OutputCollector<K2,V2> output,Reporter reporter)
throws IOException

这个方法给定输入(k1,v1)得到list(k2,v2)

OutputCollector接受mapper过程的结果,Reporter记录了任务进度的相关信息。

Hadoop预定义了一些Mapper:

IdentityMapper<K,V>:实现了Mapper<K,V,K,V>将输入直接映射为输出

InverseMapper<K,V>:实现了Mapper<K,V,V,K>逆置key/value对

RegexMapper<K>:实现了Mapper<K,Text,Text,LongWritable>,对匹配的项生成(match,1)对

TokenCount<K>:实现了Mapper<K,Text,Text,LongWritable>,生成(token,1)对

3.2.3Reducer

Reducer和Mapper一样都继承了MapReduceBase类,同时还实现了Reducer接口,它包含了

单个方法:

void reduce(K2 key,Iterator<V2> values,OutputCollector<K3,V3> output,
 Reporter reporter) throws IOException

Reducer接受到各个mapper的输出,将key/value对按照key进行排序然后按照key进行分组。

然后调用reduce函数。OutputCollection接收reduce过程的输出,并将输出写入文件中。

Reporter记录了reducer任务的进度的额外信息。

Hadoop默认实现了一些Reducer:

IdentityReducer<K,V>:实现了Reducer<K,V,K,V>将输入直接映射为输出。

LongSumReducer<K>:实现了Reducer<K,LongWritable,K,LongWritable>,计算出一个key所有value的和。

3.2.4划分--将Mapper的输出重定向

一个常见的误解是,MapReduce程序只有一个Reducer。

有多个Reducer就需要将mapper的输出正确的发送的某个Reducer上。默认的是将key进行hash

然后决定输出到哪个Reducer上,Hadoop提供了HashPartitioner类。

有时候我们需要自定义Partitioner,需要实现configure()和

getPartition()方法,configure根据hadoopjob的配置来配置partitioner,

getPartition返回分配到的reducer的号,大小从0到reducer数。

比如分析航线信息,计算从离开飞机场乘客的数量。

引用

(SanFrancisco,LosAngeles)ChuckLam

(SanFrancisco,Dallas)JamesWarren

...

我们实现EdgePartitioner:

public class EdgePartitioner implements Partitioner<Edge, Writable>
{
@Override
public int getPartition(Edge key, Writable value, int numPartitions)
{
return new Long(key.getDepartureNode()).hashCode() % numPartitions;
}
@Override
public void confi gure(JobConf conf) { }
}

3.2.5组合--本地reducer

很多MapReducer程序,在分发mapper结果之前希望进行一次本地的Reducer操作。

比如WordCount的例子,如果一个job处理一个文档包含the574词,存储和shuffle(the,574)一次要比多次(the,1)要高效。

3.2.6WordCounting和预定义的Mapper和Reducer类

使用hadoop预定义的TokenCountMapper和LongSummReducer类重写r了WordCount例子。

3.3读和写

MapReduce需要读取输入的数据,写输出的数据,所以文件的格式需要关注。hadoop提供了

灵活的处理各种数据格式的方法。

每个split大小要合适,既要足够小,提供并行处理能力,又不能太小,以至于启动和停止的时间占了大部分。

Hadoop的FSDataInputStream具有随机读的能力,所以能够有效的定位到文件split的位置。

Hadoop提供一些数据格式,你还可以自定义格式。

3.3.1输入格式:

InputFormat接口:所有的实现输入文件splitup供hadoop读取实现的接口。

TextInputFormat:默认的InputFormat实现类。这对于没有定义key的,但是想一行一行处理的数据来说非常有用。每一行一条记录

key:当前行的byteoffset,LongWritable

value:当前行,Text。

KeyValueTextInputFormat:每行一条记录,第一个分隔符将一行分开,

key:分割符之前的部分,Text

value:分割符之后的部分,Text

SequenceFileInputFormat<K,V>:一种对于一个MapReducejob是另一个MapReduce输入的一种优化的格式:

key:K用户定义

value:V用户自定义

NLineInputFormat:和TextInputFormat类似,每个split保证含有N行,mapred.line.input.format.linespermap属性,默认是1,设置了N

key:LongWritable

value:Text

你可以在配置输入使用的格式:

conf.setInputFormat(KeyValueTextInputFormat.class);

创建自定义的输入格式:

有时候hadoop提供的标准的几个输入格式不能满足要求,需要自定义。InputFormat接口

包含了两个方法:

public interface InputFormat<K,V>{
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
  RecordReader<K,V> getRecordReader(InputSplit split,
                                    JobConf job,
                                    Reporter reporter) throws IOException;
}

这两个方法提供的功能:

1.将输入数据分成输入的split,每一个map任务处理一个split

2.提供迭代给定split的每个记录的能力,并且能够将每个记录解析成预定义类型的key和value。

一般继承FileInputFormat,它实现了getSplits方法,但没有实现getRecordReader,FileInputFormat还提供了一些protected的方法,供子类覆写。

比如isSplitable(FileSystemfs,Pathfilename),它检查是否可以将一个文件分块。

有些压缩文件和其他的文件需要将一个文件视为原子记录,那么可以覆写,返回false。

使用了FileInputFormat之后,需要关注的就是自定义RecordReader:

public interface RecordReader<K,V>{
  boolean next(K key, V value) throws IOException;

  K createKey();
  V createValue();
  long getPos() throws IOException;
  void close() throws IOException;
  float getProgress() throws IOException;
}

Hadoop有一些实现好的RecordReader,比如LineRecordReader<LongWritable,Text>

它在TextInputFormat被使用,KeyValueLineRecordReader在KeyValueTextInputFormat被使用。

3.3.2输出格式。

和InputFormat对应,输出有OutputFormat类,输出没有splits,每个reducer写入自己的文件。

Hadoop提供了一些预定义的输出格式实现,可以通过JobConf的setOutputFormat来指定。

TextOutputFormat<K,V>将每个记录写成一行,key和value用\t分割,可以在mapred.textoutputformat.separator中指定分隔符。

SequenceFileOutputFormat<K,V>将key/value写入hadoop的sequence文件格式。和

SequenceFileInputFormat对应。

NullOutputFormat<K,V>不输出。

相关推荐