Hadoop浅度学习指南(HDFS、YARN、MapReduce)
大数据
- 概念:big data
5V特征:
- Volume:量大
- Value:价值高,价值密度低
- Variety:多样性
- Velocity:速度快
- Veracity:准确性
hadoop
主要组成
GFS --> HDFS
MapReduce --> MapReduce
BigTable -- > HBase
模块
- Hadoop Common: The common utilities that support the other Hadoop modules.支持hadoop其他模块的一般工具
- Hadoop Distributed File System (HDFS™): A distributed file system that provides high-throughput access to application data.高吞吐分布式文件系统
- Hadoop YARN: A framework for job scheduling and cluster resource management. 资源调度和任务管理
- Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.基于yarn的大数据并行处理系统
HDFS
组成
- namenode:管理元数据,处理来自客户端的请求
元数据:描述数据属性的数据、描述数据的数据 - secondarynamenode:元数据的合并
- datanode:具体数据的读写
- client:文件读写请求的发起
HDFS机制
namenode
- 负责元数据的管理,DataNode负责处理文件内容的读写请求
- 处理client的读写的请求,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问
- 副本存放在哪些DataNode上由 NameNode来控制,根据全局情况做出块放置决定,读取文件时NameNode尽量让用户先读取最近的副本,降低带块消耗和读取时延
- 全权管理数据块的复制、它周期性地从集群中的每个Datanode接收心跳信号和块状态报告(Blockreport),块状态报告包含了一个该Datanode上所有数据块的列表
datanode
- 一个数据块在DataNode以文件存储在磁盘上,包括数据块本身、数据块的元数据(数据块的长度,块数据的校验和,以及时间戳)
- DataNode启动后向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息
- 心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode 的心跳,则认为该节点不可用。
文件
- block 默认128M,每个块有多个副本存储在不同的机器上
- NameNode 是主节点,存储文件的元数据如文件名,文件目录结构,文件属性(生成时间,副本数,文件权限),以及每个文件的块列表以及块所在的DataNode等等
- DataNode 在本地文件系统存储文件块数据,以及块数据的校验和
- 可以创建、删除、移动或重命名文件,当文件创建、写入和关闭之后不能修改文件内容
namenode从datanode接受心跳和块报告
- namenode启动后,datanode向namenode进行注册
- 心跳
心跳是每3秒一次,
心跳返回结果带有NameNode给该DataNode的命令如删除块,
复制块等如果超过10分钟没有收到某个DataNode 的心跳,则认为该
节点不可用 - 块报告
DataNode启动后向NameNode注册,
通过后,周期性(1小时)的向NameNode上报所有的块信息
- 块损坏
当DataNode读取block的时候,重新计算checksum,和创建
时的对比DataNode 在其文件创建后三周验证其checksum
- HDFS有哪些进程
NameNode
DataNode
NodeManager
ResourceManager
NameNode启动过程
- NameNode元数据/命名空间持久化fsimage与edits
- NameNode格式化,具体做什么事
创建fsimage文件,存储fsimage信息
创建edits文件
- NameNode 启动过程
加载fsimage和edits文件
生成新的fsimage和edits文件
等待DataNode注册与发送Block Report
- DataNode 启动过程
向NameNode注册、发送Block Report
- NameNode SafeMode 安全模式
namenode启动时会进入安全模式,此时只可读不可写
- Name启动的时候首先将fsimage(镜像)载入内存,并执行(replay)编辑日志editlog的的各项操作;
- 一旦在内存中建立文件系统元数据映射,则创建一个新的fsimage文件(这个过程不需SecondaryNameNode) 和一个空的editlog;
- 在安全模式下,各个datanode会向namenode发送块列表的最新情况;
- 此刻namenode运行在安全模式。即NameNode的文件系统对于客服端来说是只读的。(显示目录,显示文件内容等。写、删除、重命名都会失败);
- NameNode开始监听RPC和HTTP请求
解释RPC:RPC(Remote Procedure Call Protocol)——远程过程通过协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议; - 系统中数据块的位置并不是由namenode维护的,而是以块列表形式存储在datanode中;
- 在系统的正常操作期间,namenode会在内存中保留所有块信息的映射信息。
HDFS启动流程及元数据的同步
- 元数据的同步
流程图:
触发的阈值(hdfs-default.xml)
dfs.namenode.checkpoint.period 3600
dfs.namenode.checkpoint.txns 1百万个事务 NameNode 启动过程
- 加载fsimage和edits文件
- 合并生成新的fsimage,并生成edits文件
- 等待DataNode注册与发送心跳和Block Report
- NameNode 启动过程中会进入SafeMode(安全模式)
安全模式
在安全模式下,文件系统不允许修改
目的,是在系统启动时检查各个datanode数据的有效性
进入安全模式的三种方式
- 手动进入
$ bin/hdfs dfsadmin -safemode enter
$ bin/hdfs dfsadmin -safemode leave
- namenode启动会自动进入
- 正常块的个数/总的块个数<0.999 也会进入安全模式
<property> <name>dfs.namenode.safemode.threshold-pct</name> <value>0.999f</value> </property>
HDFS特点
优点
- 处理超大文件
- 一次写入,多次读取
- 运行与廉价服务器
- 不移动数据到计算点,而是就地计算,减少网络阻塞
缺点:
- 高延迟,不适合接入前台业务
- 不支持任意的修改
HDFS API
Java API
package com.ct.test; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import org.apache.commons.compress.utils.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.junit.Before; import org.junit.Test; public class TestDemo { FileSystem fs = null; // public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException { // //// FileSystem fs = FileSystem.get(new URI("hdfs://centos01:8020"), //// new Configuration(), //// "chen"); //// //// boolean success = fs.mkdirs(new Path("/test")); //// //// System.out.println(success); //// test.setUp(); //// test.testMkdir(); //// test.testDelete(); // // // // // } @Before //获取文件对象 public void setUp() { Configuration conf = new Configuration(); conf.set("dfs.replication", "7"); try { fs = FileSystem.get(new URI("hdfs://centos01:8020"), conf, "chen"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //创建文件夹 @Test public void testMkdir() throws IllegalArgumentException, IOException { boolean success = fs.mkdirs(new Path("/result")); System.out.println(success); } //删除文件夹 public void testDelete() throws IllegalArgumentException, IOException { fs.delete(new Path("/result"), true); } @Test //上传文件 public void testUpload() throws IllegalArgumentException, IOException { FSDataOutputStream out = fs.create(new Path("/input/testUpload.log")); FileInputStream input = new FileInputStream("F:/test.txt"); IOUtils.copy(input, out, 1024); } @Test public void testDownload() throws IllegalArgumentException, IOException { FSDataInputStream input = fs.open(new Path("/input/testUpload.log")); FileOutputStream out = new FileOutputStream("F:/test-copy.txt"); IOUtils.copy(input, out, 1024); } @Test public void testList() throws FileNotFoundException, IllegalArgumentException, IOException { RemoteIterator<LocatedFileStatus> ri = fs.listFiles(new Path("/input"), true); while(ri.hasNext()) { LocatedFileStatus next = ri.next(); next.getBlockLocations(); String group = next.getGroup(); long len = next.getLen(); String owner = next.getOwner(); FsPermission permission = next.getPermission(); long blockSize = next.getBlockSize(); short rep = next.getReplication(); System.out.println(permission+"\t"+owner+"\t"+group); System.out.println(len+"\t"+blockSize+"\t"+rep); BlockLocation[] blockLocations = next.getBlockLocations(); for (BlockLocation blktn : blockLocations) { System.out.println("length:"+blktn.getLength()); System.out.println("offset:"+blktn.getOffset()); System.out.println(Arrays.toString(blktn.getHosts())); } } } }
HDFS读流程
- 打开分布式文件调用 分布式文件DistributedFileSystem.open()方法
- 从 NameNode 获得 DataNode 地址DistributedFileSystem 使用 RPC 调用 NameNode,NameNode返回存有该副本的 DataNode 地址,DistributedFileSystem 返回一个输入流 FSDataInputStream对象,该对象封存了输入流DFSInputStream
- 连接到DataNode调用 输入流 FSDataInputStream 的 read() 方法,从而 输入流DFSInputStream 连接 DataNodes
- 读取DataNode反复调用 read()方法,从而将数据从 DataNode 传输到客户端
- 读取另外的DataNode直到完成到达块的末端时候,输入流 DFSInputStream 关闭与DataNode连接, 寻找下一个 DataNode
- 完成读取,关闭连接,即调用输入流 FSDataInputStream.close()
HDFS写流程
- 发送创建文件请求:调用分布式文件系统DistributedFileSystem.create()方法
- NameNode中创建文件记录:分布式文件系统DistributedFileSystem 发送 RPC 请求给namenode,namenode 检查权限后创建一条记录,返回输出流 FSDataOutputStream,封装了输出流 DFSOutputDtream
- 客户端写入数据:输出流 DFSOutputDtream 将数据分成一个个的数据包,并写入内部队列。DataStreamer 根据 DataNode 列表来要求 namenode 分配适合的新块来存储数据备份。一组DataNode 构成管线(管线的 DataNode 之间使用 Socket 流式通信)
- 使用管线传输数据:DataStreamer 将数据包流式传输到管线第一个DataNode,第一个DataNode 再传到第二个DataNode ,直到完成。
- 确认队列:DataNode 收到数据后发送确认,管线的DataNode所有的确认组成一个确认队列。所有DataNode 都确认,管线数据包删除。
- 关闭:客户端对数据量调用close()方法。将剩余所有数据写入DataNode管线,并联系NameNode且发送文件写入完成信息之前等待确认。
- NameNode确认
- 故障处理:若过程中发生故障,则先关闭管线, 把队列中所有数据包添加回去队列,确保数据包不漏。为另一个正常DataNode的当前数据块指定一个新的标识,并将该标识传送给NameNode, 一遍故障DataNode在恢复后删除上面的不完整数据块. 从管线中删除故障DataNode 并把余下的数据块写入余下正常的DataNode。NameNode发现复本两不足时,会在另一个节点创建一个新的复本
YARN
组成
- resourcemanger:负责全局的任务调度和资源管理(内存、CPU)、启动/监控applicationMaster 、监控NodeManager
- nodemanger:单个节点的资源管理、处理来自resourcemanger和applicationmaster的任务请求
- client:发起任务的请求
- container:对环境的抽象,封装了CPU、内存、环境变量
- applicationmaster:负责管理应用,为应用申请资源,任务的监控和容错
服务功能
ResourceManager
- 处理客户端请求
- 启动/监控ApplicationMaster
- 监控NodeManager
- 资源分配与调度
NodeManager
- 单个节点上的资源管理
- 处理来自ResourceManager的命令
- 处理来自ApplicationMaster的命令
ApplicationMaster
- 数据切分
- 为应用程序申请资源,并分配给内部任务
- 任务监控与容错
Container
- 对任务运行环境的抽象,封装了CPU、内存等多维资源以及环境变量、启动命令等任务运行相关的信息
YARN工作流程
- 客户端向ResourceManager提交应用程序,其中包括ApplicationMaster、启动ApplicationMaster的命令、用户程序等;
- ResourceManager为该应用程序分配第一个Container,并与对应NodeManager通信,要求它在这个Container中启动应用程序的ApplicationMaster;
- ApplicationMaster向ResourceManager注册自己,启动成功后与ResourceManager保持心跳;
- ApplicationMaster向ResourceManager申请资源;
- 申请资源成功后,由ApplicationMaster进行初始化,然后与NodeManager通信,要求NodeManager启动Container。然后ApplicationMaster与NodeManager保持心跳,从而对NodeManager上运行的任务进行监控和管理;
- Container运行期间,向ApplicationMaster汇报自己的进度和状态信息,以便ApplicationMaster掌握任务运行状态,从而在任务失败是可以重新启动;
- 应用运行结束后,ApplicationMaster向ResourceManager注销自己,允许其所属的Container回收。
MapReduce
Map和Reduce 计算框架,编程模型 “分而治之”的思想, 分布式并行计算
Mapper
对一些独立元素组成的列表的每一个元素进行制定的操作,可高度并行
// step 1: Map Class /** * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * */ //TODO update paragram public static class ModuleMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub }
Reducer
对一个列表元素进行合并
// step 2: Reduce Class /** * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * */ //TODO public static class ModuleReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub } }
Job
// step 3: Driver ,component job, implements Tool public int run(String[] args) throws Exception { // 1: get configration Configuration configuration = getConf(); // 2: create Job Job job = Job.getInstance(configuration, this.getClass() .getSimpleName()); // run jar job.setJarByClass(this.getClass()); // 3: set job // input -> map -> reduce -> output // 3.1 input Path inPath = new Path(args[0]); FileInputFormat.addInputPath(job, inPath); // 3.2: map job.setMapperClass(ModuleMapper.class); //TODO update paragram job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 3.3: reduce job.setReducerClass(ModuleReducer.class); //TODO job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 3.4: output Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); // 4: submit job boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; }
WordCount
package com.wordcount; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordCountDemo extends Configured implements Tool { /** * map 任务的定义 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN 偏移量 LongWritable * VALUEIN 一行文本 Text * KEYOUT 单词 Text * VALUEOUT 1 IntWritable * * map任务 * 将一行文本拆分成单词 * * */ public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text keyOut = new Text(); IntWritable valueOut = new IntWritable(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { System.out.println("keyIn:"+key+"\t\t"+"valueIn:"+value); //1. 单词拆分 String[] vals = value.toString().split(" "); //2. 遍历输出 for (String val : vals) { keyOut.set(val); valueOut.set(1); context.write(keyOut, valueOut); System.out.println("keyOut:"+keyOut+"\t\t"+"valueOut:"+valueOut); } } } /** * * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN 单词 Text * VALUEIN 单词次数的集合 list的元素 IntWritable * KEYOUT 单词 Text * VALUEOUT 总次数 IntWritable * */ public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> { IntWritable valueOut = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { System.out.print("keyIn:"+key+"\t\t["); //1. 求次数综合 int sum = 0; for (IntWritable value : values) { sum += value.get(); System.out.print(value+",\t"); } System.out.println("]"); //2. 输出 valueOut.set(sum); context.write(key, valueOut); } } @Override public int run(String[] args) throws Exception { //1 设置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2. 设置map类和reduce类 job.setMapperClass(WCMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //3 设置输入输出路径 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new WordCountDemo(), args); System.out.println(run==1?"成功":"失败"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
MapReduce实现表的join
map join
适合大小表join,将小表缓存在内存中,join发生在map端
只缓存一次,在Mapper子类中重写setup方法,在setup方法中将小表文件装入内存中
Mapper子类中map方法读取大表
package com.join; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapJoin extends Configured implements Tool { public static class MJMapper extends Mapper<LongWritable, Text, Text, Text> { HashMap<String, String> cacheMap = new HashMap<String, String>(); // 首相将小表读入内存 // 该方法只在每次任务开始时加载一次 @Override protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String path = "F:\\input\\join\\dept.log"; FileReader fr = new FileReader(path); BufferedReader br = new BufferedReader(fr); String line = null; while((line=br.readLine()) != null) { String[] vals = line.split("\t"); cacheMap.put(vals[0], vals[1]); } br.close(); fr.close(); } // map端根据两张表的key进行合并 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] vals = value.toString().split("\t"); String deptno = cacheMap.get(vals[2]); String dname = cacheMap.get(deptno); context.write(new Text(deptno), new Text(dname+"\t"+vals[0]+vals[1])); } } @Override public int run(String[] args) throws Exception { //1 设置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2 设置map类和reduce job.setMapperClass(MJMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //3 设置输入输出路径 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); //4 提交 boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new MapJoin(), args); System.out.println(run==1?"成功":"失败"); } catch (Exception e) { e.printStackTrace(); } } }
reduce join
适合两张大表join
package com.join; import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ReduceJoin extends Configured implements Tool { /* * 1 技术部 * 1002 rose 1 */ public static class RJMapper extends Mapper<LongWritable, Text, Text, Text>{ Text keyOut = new Text(); Text valueOut = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] vals = value.toString().split("\t"); if(vals.length == 2) { keyOut.set(vals[0]); valueOut.set(vals[1]); }else { keyOut.set(vals[2]); valueOut.set(vals[0]+"\t"+vals[1]); } context.write(keyOut, valueOut); } } /* * keyIn:1 * valueIn List{[1007 lily], [1002 rose], [1001 jack], [技术部]} */ // reduce端合并是依靠MapReduce shuffle过程中将相同key的行放入同一台机器 public static class RJReducer extends Reducer<Text, Text, Text, Text> { ArrayList<String> employees = new ArrayList<String>(); @Override protected void reduce(Text keyIn, Iterable<Text> valueIn, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { String department = null; employees.clear(); //这里要注意清空list for (Text tmp : valueIn) { String[] vals = tmp.toString().split("\t"); // 根据length判断这是张什么表 if(vals.length == 1) { department = vals[0]; }else if(vals.length == 2) { employees.add(tmp.toString()); } } for (String employee : employees) { context.write(keyIn, new Text(employee+"\t"+department)); } } } @Override public int run(String[] args) throws Exception { //1 设置job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(this.getClass().getSimpleName()); //2 设置map类和reduce job.setMapperClass(RJMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(RJReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //3 设置输入输出路径 FileInputFormat.setInputPaths(job, args[0]); Path out = new Path(args[1]); FileSystem fs = out.getFileSystem(conf); if(fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); //4 提交 boolean success = job.waitForCompletion(true); return success?1:0; } public static void main(String[] args) { try { int run = ToolRunner.run(new ReduceJoin(), args); System.out.println(run==1?"成功":"失败"); } catch (Exception e) { e.printStackTrace(); } } }
Hadoop的安装模式
- 单机模式
- 伪分布模式(pseudo)
- 完全分布模式
hadoop 开发环境搭建
maven环境搭建
安装maven
- 解压apache-maven-3.0.5.tar.gz
- 配置maven环境变量
MAVEN_HOME=[maven的解压目录]
%MAVEN_HOME%/bin; - 命令提示符 mvn -version
- 解压repository.tar.gz到windows磁盘(如 E:toolsrepository)
修改settings.xml配置文件中指定的repository(修改apache-maven-3.0.5confsettings.xml)
<localRepository>D:/repository</localRepository>
配置eclipse的maven环境
windows->preferences->maven->->installations->add->勾选自己安装的maven ->user settings->选择mave家目录/conf/settings
- 创建maven工程
- 将${hadoop_Home}/ect/hadoop/log4j.properties拷贝到项目的src目录
- 修改pom.xml
windows下搭建 hadoop开发环境
Windows安装hadoop
- 解压hadoop-2.5.0.tar.gz到本地windows磁盘
配置hadoop的环境变量
添加环境变量 HADOOP_HOME=hadoop解压目录 在PATH环境变量中追加 %HADOOP_HOME%/bin;
测试
hadoop -h
eclipse安装插件
- 解压eclipse
- 将hadoop-eclipse-plugin-2.6.0.jar拷贝到${MyEclispe_HOME}/plugins
- 打开(重启)eclispe,菜单栏->windows->Preferneces->Hadoop MapReduce
eclipse配置插件参数,连接HDFS
- 在linux中的hadoop安装目录下的etc/hadoop/hdfs-site.xml添加如下配置,重启HDFS的进程
<!--关闭hdfs的文件权限控制--> <property> <name>dfs.permissions</name> <value>false</value> </property>
eclipse->windows->show views->other->输入MapReduce->点击map reduce locations
右击->new hadoop locationsMap/Reduce Master
Mapreduce(V2) host:[hostname] port:8032 //resourcemanager 的默认端口号
DFS Master
DFS Master host:[hostname] port:8020
- 拷贝winutils.exe 和hadoop.dll到${hadoop_HOME}/bin
- 单独拷贝hadoop.dll到C:WindowsSystem32
- 创建maven工程,通过pom.xml导包
将lo4j.perperties文件拷贝到src/main/resources
打jar包,提交集群运行
- jar包时,指定主类
yarn jar pv.jar /input/2015082818 /output
- jar包时,不指定主类
yarn jar pv.jar 类的全限定名 /input/2015082818 /output
不同包中可能有相同类名,所以要指定类的全限定名
Shuffle
MapReduce框架核心部分(设计精髓):内核
shuffle 定义
map() 输出开始 到 reduce()输入开始 此阶段是shuffle
input -> map -> shuffle -> reduce -> output
shuffle分为两个阶段
map shuffle phase
reduce shuffle phase
shuffle主要操作
partitioner - map
sorter - map & reduce
combiner: map phase局部聚合操作 不是所有的MapReduce程序都可以进行局部聚合的
compress:map phase的输出数据压缩 针对所有MapReduce程序都可以进行设置
group - reduce
shuffle详解
所有操作都是针对map()输出的<key, value>数据进行的
map shuffle phase
- 进入环形缓冲区(默认100MB)
当达到环形缓冲区内存的80%默认情况下,将会将缓冲区中的数据spill到本地磁盘中(溢出到MapTask所运行的NodeManager机器的本地磁盘中)
溢写
并不是立即将缓冲区中的数据溢写到本地磁盘,而是需要经过一些操作
- 分区paritioner
依据此MapReduce Job中Reduce Task个数进行分区决定map输出的数据被哪个reduce任务进行处理分析默认情况下,依据key采用HashPartitioner
- 分区paritioner
// 通过取余将数据分配到哪个reduce处理 HashPartitioner int getParitition(key, value, numreducetask) { return ( key.hashCode&Integer.maxValue)%numreducetask; }
- 排序sorter
会对每个分区中的数据进行排序,默认情况下依据key进行排序
- spill溢写
将分区排序后的数据写到本地磁盘的一个文件中
反复上述的操作,产生多个小文件
当溢写结束后
- 此时将spill到本地磁盘的小文件进行一次合并。
- combiner: (可选)map端的reduce
- compress:(可配置) 数据减少了, 减少网络IO; 但压缩消耗CPU性能,也需要时间
reduce shuffle phase
- merge 合并
各个分区的数据合并在一起(当MapTask处理数据完成以后,告知AppMaster,然后AppMaster通知所有的ReduceTask,各个ReduceTask主动到已经完成的MapTask的本地磁盘,去拉取属于自己要处理的数据(分区中))
- 排序 对各个分区中的数据进行排序
最后每个分区形成一个文件(map输出的数据最后在个文件中),分区的,并且各个分区的数据已经进行了排序。
分组group
将相同key的value值存入到list集合,形成新的key, list(value),将key/value对数据传递给reduce()函数进行处理。
最后将(key, list(value))传给 reduce()
map个数及reduce个数确定
map个数确定
FileInputFormat.setMaxInputSplitSize(job, size); 设置切片最大值 FileInputFormat.setMinInputSplitSize(job, size); 设置切片最小值
FileInputFormat public List<InputSplit> getSplits(JobContext job){。。。} protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); } // minSize<=maxSize<blockSize 提高并发 // minSize>blockSize 降低并发
reduce个数确定
job.setNumReduceTasks(2); HashParitioner 决定map输出的类被哪个reduce处理
自定义shuffle
自定义key
- key 和 value 都可以使用自定义类
- 自定义的类不使用 Java 自带的 serializable 接口,改用hadoop 提供的Writable 接口
- 注意重写 toString、 write、readFields
package com.flow; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 不用serializable * * 用Hadoop的Writable * */ public class Flow implements Writable { private long up; private long down; private long sum; public long getUp() { return up; } public void setUp(long up) { this.up = up; } public long getDown() { return down; } public void setDown(long down) { this.down = down; } public long getSum() { return sum; } public void setSum(long sum) { this.sum = sum; } @Override public String toString() { return up + "\t" + down + "\t" + sum; } @Override public void write(DataOutput out) throws IOException { out.writeLong(up); out.writeLong(down); out.writeLong(sum); } @Override public void readFields(DataInput in) throws IOException { up = in.readLong(); down = in.readLong(); sum = in.readLong(); } }
自定义分区
- 调用 job 的 setNumReduceTasks 方法设置reduce 个数
- setPartitionerClass 设置分区
public static class MyPartitioner extends Partitioner<Text, Flow> { @Override public int getPartition(Text key, Flow value, int numPartitions) { if(value.getSum()<1024) { return 0; }else if(value.getSum()<10*1024) { return 1; } return 2; } }
排序
只能按照key排序,如果需要多重排序,需要自定义key
在shuffle过程中自动排序,无需手动调用方法
public class MyKey implements WritableComparable<MyKey> //要排序的类要实现WritableComparable接口 @Override public int compareTo(MyKey o) { long result = o.getSum() - this.getSum(); if(result>0) { return 1; }else if(result<0) { return -1; } return o.getPhone().compareTo(this.getPhone()); }
combiner
map端的小reduce,对每个map后的value进行reduce,减少数据传输
可以通过设置job.setCombinerClass(WCReducer.class);设置combiner
前后效果对比
原始数据 hello world hello hadoop hello world hello java keyIn:hadoop [1, ] keyIn:hello [1, 1, 1, 1, ] keyIn:java [1, ] keyIn:world [1, 1, ] keyIn:hadoop [1, ] keyIn:hello [2, 2, ] keyIn:java [1, ] keyIn:world [1, 1, ]
分组
根据需求将key中相同的字段作为同一个key以减少键值对,作为一种优化的手段
重写 RawComparator 方法合并key中相同字段
通过 job.setGroupingComparatorClass(Mygroup.class); 调用
public static class Mygroup implements RawComparator<Person> { @Override public int compare(Person o1, Person o2) { // TODO Auto-generated method stub return 0; } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4); } }
hadoop优化
- 可以设置block默认大小
- 设置map个数
- 调整环形缓冲区大小
- 自定义分区 --> 解决数据清倾斜问题
- 自定义 combiner --> map端的小reduce,减少网络传输损耗
- 自定义分组 --> 减少键值对
- 设置reduce个数 --> 加快处理速度
- CombinerFileInputFormat --> 合并小文件
- 根据业务自定义key和value
Java MapReduce编程错误
org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable
map方法把文件的行号当成key,所以要用LongWritable。