Hadoop要点总结
Hadoop是一个由Apache基金会所开发的分布式系统基础架构,主要解决,海量数据的存储和海量数据的分析计算问题。
HDFS
NameNode工作机制
- 加载fsimage(镜像文件)和edits.001(编辑日志)到内存中
- 客户端向namenode发起增删改查请求
- namenode记录操作日志,更新滚动日志
- namenode在内存中对数据进行增删改查
Secondary NameNode工作机制
向namenode询问是否需要checkpoint
checkpoint触发条件:
- 定时时间到了
- edits中数据满了
- 请求执行checkpoint
- namenode滚动正在写的edits
- 拷贝编辑日志、镜像文件到secondary namenode中
- secondary namenode把拷贝来的镜像文件和编辑日志合并
- 生成新的fsimage命名为fsimage.chkpoint
- 将fsimage.chkpoint拷贝到namenode
- 重命名生成fsimage
HDFS写数据流程
- 客户端向namenode请求上传文件,namenode检查目标文件是否已存在,父目录是否存在。
- namenode返回是否可以上传
- 客户端请求第一个block上传到哪几个datanode服务器上
- namenode返回三个datanode节点,分别为dn1,dn2,dn3
- 客户端请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成
- dn1、dn2、dn3逐级应答客户端
- 客户端开始往dn1上传第一个block,以packet为单位,dn1收到一个packet就会传给dn2,dn2传给dn3,dn1每传一个packet会放入一个应答队列等待应答。
- 当一个block传输完成之后,客户端再次请求namenode上传第二个block的服务器。(重复执行3-7步)
HDFS读数据流程
- 客户端向namenode请求下载文件,namenode通过查询元数据,找到文件块所在的datanode地址
- 挑选一台datanode(就近原则,然后随机)服务器,请求读取数据
- datanode开始传输数据给客户端(从磁盘里读取数据放入流,以packet为单位做校验)
- 客户端以packet单位接收,先缓存在本地,然后写入目标文件中
SecondaryNameNode目录结构
在$HADOOP_HOME/data/tmp/dfs/namesecondary/current
这个目录中查看SecondaryNameNode目录结构。
在主namenode发生故障时(假设没有及时备份数据),可以从SecondaryNameNode恢复数据:
方案一:将SecondaryNameNode中数据拷贝到namenode存储数据的目录。
方案二:使用-importCheckpoint选项启动namenode守护进程,从而将SecondaryNameNode中数据拷贝到namenode目录中。(极慢)
DataNode工作机制
- 一个数据块在datanode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块长度,块数据校验和以及时间戳
- datanode启动后向namenode注册,通过后,周期性(1小时)的向namenode上报所有的块信息。
- 心跳是3秒一次,心跳返回结果带有namenode给该datanode的命令如复制数据块到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个datanode的心跳,则认为该节点不可用。
- 集群运行中可以安全加入和退出一些机器。
服役新节点
1.在namenode主机的$HADOOP_HOME/etc/hadoop
目录下创建dfs.hosts文件,在该文件中添加所有主机名(包括新节点)e.g.
hadoop102 hadoop103 hadoop104 hadoop105
2.在namenode的hdfs-site.xml配置文件中增加dfs.hosts属性
<property> <name>dfs.hosts</name> <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts</value> </property>
3.刷新namenode:hdfs dfsadmin -refreshNodes
4.更新resourcemanager节点:yarn rmadmin -refreshNodes
5.在namenode的slaves文件中增加新主机名称
6.单独命令启动新的数据节点和节点管理器hadoop-daemon.sh start datanode
yarn-daemon.sh start nodemanager
退役旧节点
1.在namenode的/opt/module/hadoop-2.7.2/etc/hadoop目录下创建dfs.hosts.exclude文件,向其中添加要退役的主机名称
2.在namenode的hdfs-site.xml配置文件中增加dfs.hosts.exclude属性
<property> <name>dfs.hosts.exclude</name> <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude</value> </property>
3.刷新namenode、刷新resourcemanagerhdfs dfsadmin -refreshNodes
yarn rmadmin -refreshNodes
4.在web浏览器端检查,退役节点的状态为decommission in progress(退役中),说明数据节点正在复制块到其他节点。
5.等待退役节点状态为decommissioned(所有块已经复制完成),停止该节点及节点资源管理器。hadoop-daemon.sh stop datanode
yarn-daemon.sh stop nodemanager
PS:如果副本数是3,服役的节点小于等于3,是不能退役成功的,需要修改副本数后才能退役。
6.从include文件中删除退役节点,再运行刷新节点的命令
7.从namenode的slave文件中删除退役节点
MapReduce
Writable序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop自己开发了一套序列化机制(Writable),精简、高效。
常用的数据类型对应的hadoop数据序列化类型
boolean
--> BooleanWritable
byte
--> ByteWritable
int
--> IntWritable
float
--> FloatWritable
long
--> LongWritable
double
--> DoubleWritable
string
--> Text
map
--> MapWritable
array
--> ArrayWritable
自定义bean对象实现序列化接口
- 必须实现Writable接口
- 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
- 重写序列化方法
- 重写反序列化方法(注意反序列化的顺序和序列化的顺序完全一致)
- 要想把结果显示在文件中,需要重写toString()
- 如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序
MapTask并行度决定机制
- 一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定。
- 每一个切片分配一个MapTask并行实例
- 切片大小默认=blocksize
- 切片时针对每一个文件单独切片,不考虑数据集整体
MapTask工作机制
Read阶段
maptask调用InputFormat,InputFormat又调用RecordReader从输入文件中解析出一个个K/V。
Map阶段
将解析出的K/V交给客户端map()方法处理,并产生新的K/V。
Collect阶段
当map()方法处理完数据后,一般会调用OutputCollector.collect()输出结果,在该函数内部调用Partitioner对K/V进行分区,且根据K进行分区内排序,并写入一个环形缓冲区中。
溢写阶段
当环形缓冲区达到80%时,会将数据写到本地磁盘上生成一个临时文件。
将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
Combine阶段
当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
自定义InputFormat
编写一个类继承FileInputFormat
public class WholeInputFormat extends FileInputFormat<NullWritable, BytesWritable> { @Override public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // 自定义recordreader WholeRecordReader recordReader = new WholeRecordReader(); recordReader.initialize(split, context); return recordReader; } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } }
自定义一个RecordReader
public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit split; private Configuration conf; private BytesWritable value = new BytesWritable(); private boolean processed = false; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (FileSplit) split; this.conf = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!processed) { byte[] contents = new byte[(int) split.getLength()]; Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); FSDataInputStream fis = null; try { fis = fs.open(path); IOUtils.readFully(fis, contents, 0, contents.length); value.set(contents, 0, contents.length); } finally { IOUtils.closeStream(fis); } processed = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return this.value; } @Override public float getProgress() throws IOException, InterruptedException { return processed ? 1 : 0; } @Override public void close() throws IOException { } }
自定义RecordReader重点是nextKeyValue()
方法,它定义了如何封装向maptask输入的键值对,本例中是将每个文件的所有内容作为value输入到maptask中。
大量小文件的切片优化(CombineTextInputFormat)
// 如果不设置InputFormat,它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
经过以上设置,可有效减少切片数
Shuffle机制
MapReduce保证每个reducer的输入都是按键有序排列的,系统执行排序的过程(即将map输出作为输入传给reducer)称为shuffle。
Partition分区
默认partition分区
public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }
默认采用的是org.apache.hadoop.mapreduce.lib.partition.HashPartitioner
,当中的getPartition()
方法是根据key的hashCode对reduceTasks个数取模得到的。
用户自定义分区
1.继承Partitioner,重写getPartition()方法
public class MyPartitioner extends Partitioner<K, V> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { int partition=xxx; if (exp0) { partition = 0; }else if (exp1) { partition = 1; }else if (exp2) { partition = 2; }else if (exp3) { partition = 3; } return partition; } }
2.在job驱动中,设置自定义partitioner
job.setPartitionerClass(MyPartitioner.class)
3.根据自定义partitioner的逻辑设置相应数量的reduce task
job.setNumReduceTasks(5);
PS:
如果reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
如果1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会产生异常;
如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000。
WritableComparable排序
bean对象实现WritableComparable接口重写compareTo方法,就可以实现排序
@Override public int compareTo(Bean o) { // 倒序排列,从大到小 return this.xxx > o.getXxx() ? -1 : 1; }
GroupingComparator分组
对reduce阶段的数据根据某一个或几个字段进行分组。
public class OrderGroupingComparator extends WritableComparator { protected OrderGroupingComparator() { // 记得此处设置true,否则会报空指针异常 super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; return abean.getOrder_id().compareTo(bbean.getOrder_id()); } }
Combiner合并
combiner是MR程序中Mapper和Reducer之外的一种组件,combiner组件的父类就是Reducer,combiner和reducer的区别在于运行的位置:Combiner是在每一个maptask所在的节点运行,Reducer是接收全局所有Mapper的输出结果。combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量。
自定义Combiner实现步骤
1.自定义一个combiner继承Reducer,重写reduce方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable v :values){ count = v.get(); } context.write(key, new IntWritable(count)); } }
2.在job驱动类中设置
job.setCombinerClass(WordcountCombiner.class);
combiner能够应用的前提是不能影响最终的业务逻辑,而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来。
从控制台观察使用combiner前后变化:
数据倾斜&Distributedcache
数据倾斜原因
如果是多张表的操作都是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。
解决方案
在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端数据的压力,尽可能的减少数据倾斜。
- 在mapper的setup阶段,将文件读取到缓存集合中
- 在驱动函数中加载缓存
job.addCacheFile(new URI("file:/e:/cache/pd.txt")); // 缓存普通文件到task运行节点
自定义OutputFormat
1.自定义一个outputformat继承org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { // 创建一个RecordWriter return new FilterRecordWriter(job); } }
2.具体的写数据RecordWriter
public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream atguiguOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 获取文件系统 FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 创建输出文件路径 Path atguiguPath = new Path("e:/atguigu.log"); Path otherPath = new Path("e:/other.log"); // 3 创建输出流 atguiguOut = fs.create(atguiguPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断是否包含“atguigu”输出到不同文件 if (key.toString().contains("atguigu")) { atguiguOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭资源 if (atguiguOut != null) { atguiguOut.close(); } if (otherOut != null) { otherOut.close(); } } }
数据压缩
在Map输出端采用压缩
// 开启map端输出压缩 configuration.setBoolean("mapreduce.map.output.compress", true); // 设置map端输出压缩方式 configuration.setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class);
在reduce输出端压缩
// 设置reduce端输出压缩开启 FileOutputFormat.setCompressOutput(job, true); // 设置压缩的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // FileOutputFormat.setOutputCompressorClass(job, Lz4Codec.class); // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
ReduceTask工作机制
Copy阶段
ReduceTask从各个MapTask远程拷贝一片数据,如果某一片数据大小超过阈值,则写到磁盘上,否则直接放在内存中。
Merge阶段
在远程拷贝数据时,reducetask后台启动了两个线程对内存和磁盘上的文件进行合并,以防止内存或硬盘使用过多。
Sort阶段
用户编写的reduce()方法输入数据是按照key进行聚集的,为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
Reduce阶段
reduce()函数将计算结果写到HDFS上。
MapReduce参数优化
资源相关参数
用户在mr应用程序中配置可以生效:
mapreduce.map.memory.mb
一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。mapreduce.reduce.memory.mb
一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。mapreduce.map.java.opts
Map Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g."-Xmx1024m -verbose:gc -Xloggc:/tmp/@[email protected]" (@taskid@会被Hadoop框架自动换为相应的taskid), 默认值: ""mapreduce.reduce.java.opts
Reduce Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g."-Xmx1024m -verbose:gc -Xloggc:/tmp/@[email protected]", 默认值: ""mapreduce.map.cpu.vcores
每个Map task可使用的最多cpu core数目, 默认值: 1mapreduce.reduce.cpu.vcores
每个Reduce task可使用的最多cpu core数目, 默认值: 1
应该在yarn启动之前就配置在服务器的配置文件中才能生效:
yarn.scheduler.minimum-allocation-mb 1024
给应用程序container分配的最小内存yarn.scheduler.maximum-allocation-mb 8192
给应用程序container分配的最大内存
shuffle性能优化的关键参数,应在yarn启动之前就配置好:
mapreduce.task.io.sort.mb 100
shuffle的环形缓冲区大小,默认100mmapreduce.map.sort.spill.percent 0.8
环形缓冲区溢出的阈值,默认80%
容错相关参数
mapreduce.map.maxattempts
每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。mapreduce.reduce.maxattempts
每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。mapreduce.map.failures.maxpercent
当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业扔认为成功。mapreduce.reduce.failures.maxpercent
当失败的Reduce Task失败比例超过该值为,整个作业则失败,默认值为0。mapreduce.task.timeout
Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。
Yarn
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序。
yarn工作机制
- MR程序提交到客户端所在节点
- yarnrunner向resourcemanager申请一个application
- rm将该应用程序的资源路径返回给yarnrunner
- 将改程序所需要的资源提交到HDFS上
- 程序资源提交完毕后,申请运行MrAppMaster
- mr将用户请求初始化成一个task
- 其中一个nodemanager领取到task任务
- 该nodemanager创建容器container并产生MRAppMaster
- container从HDFS上拷贝资源到本地
- MrAppMaster向RM申请运行maptask容器
- rm将运行maptask任务分配给另外两个nodemanager,另外两个nodemanager分别领取任务并创建容器
- MR向两个接收到任务的nodemanager发送程序启动脚本,这两个nodemanager分别启动maptask,maptask对数据分区排序
- MRAppMaster向rm申请两个容器,运行reducetask
- reducetask向maptask获取相应分区数据
- 程序运行结束后,MR向RM注销自己