Hadoop 生态之 MapReduce 及 Hive 简介
1.计算框架
Hadoop 是一个计算框架,目前大型数据计算框架常用的大致有五种:
- 仅批处理框架:Apache hadoop.
- 仅流处理框架:Apache Storm、Apache Samza.
- 混合框架:Apache Spark、Apache Flink.
这其中名气最大、使用最广的当属 Hadoop 和 Spark。
虽然两者都被称为大数据框架,但实际层级不同。Hadoop 是一个分布式数据基础设施,包括计算框架 MapReduce、分布式文件系统 HDFS、YARN 等。而Spark 是专门用来对分布式存储的大数据的处理工具,并不会进行数据存储,更像是 MapReduce 的替代。
在使用场景上,Hadoop 主要用于离线数据计算,Spark更适用于需要精准实时的场景。本文主要介绍 Hadoop,对 Spark 不做讨论。
本篇文章可承接知识库 Hadoop之HDFS (https://gitlab.aihaisi.com/docs/docs/issues/516) ,介绍下 Hadoop 另一重要组件 MapReduce,以及 Hive。
2. MapReduce
2.1 MapReduce 是什么
一个基于 Java 的并行分布式计算框架。
前文有提到 HDFS 提供了基于主从结构的分布式文件系统,基于此存储服务支持,MapReduce 可以实现任务的分发、跟踪、执行等工作,并收集结果。
2.2 MapReduce 组成
MapReduce 主要思想讲的通俗一点就是将一个大的计算拆分成 Map(映射)和 Reduce(化简)。说到这里,其实 JAVA8 在引入 Lambda 后,也有 map 和 reduce 方法。下面是一段 Java 中的用法:
- List<Integer> nums = Arrays.asList(1, 2, 3);
- List<Integer> doubleNums = nums.stream().map(number -> number * 2).collect(Collectors.toList());
- 结果:[2,4,6]
- Optional<Integer> sum = nums.stream().reduce(Integer::sum);
- 结果:[6]
代码很简单,map 负责归类,reduce 负责计算。而 Hadoop 中的 MapReduce 也有异曲同工之处。
下面结合官方案例 WordCount 进行分析:
- public class WordCount {
- // Mapper泛型类,4个参数分别代表输入键、值,输出键、值类型
- public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
- public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
- // 字符解析
- StringTokenizer itr = new StringTokenizer(value.toString());
- while (itr.hasMoreTokens()) {
- // nextToken():返回从当前位置到下一个分隔符的字符串
- word.set(itr.nextToken());
- context.write(word, one);
- }
- }
- }
- // Reducer同样也是四个参数
- public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
- private IntWritable result = new IntWritable();
- public void reduce(Text key, Iterable<IntWritable> values,Context context) throws
- IOException,InterruptedException {
- int sum = 0;
- // 循环values,并记录“单词”个数
- for (IntWritable val : values) {
- sum += val.get();
- }
- result.set(sum);
- context.write(key, result);
- }
- }
在这段代码中,不难看出程序核心是 map 函数和 reduce 函数。是否 MapReduce 就是由这两者组成的?接着往下看。
2.3 Map 和 Reduce
2.3.1 Map
在 WordCount 案例中,明显看到 map 函数的输入主要是一个
Context 在这里暂时性忽略,其是 Mapper 类的内部抽象类,一般计算中不会用到,可以先当做“上下文”理解。
map 函数计算过程是: 将这行文本中的单词提取出来,针对每个单词输出一个
2.3.2 Reduce
接着就来看看 reduce ,这里输入参数 Values 就是上面提到的由很多个 1 组成的集合,而 Key 就是具体“单词” word。
它的计算过程是: 将集合里的1求和,再将单词(word)与这个和(sum)组成一个
假设有两个数据块的文本数据需要进行词频统计,MapReduce 计算过程如下图所示:
到这都很容易理解,毕竟只是个 HelloWorld 的例子~,但整个MapReduce过程中最关键的部分其实是在 map 到 reduce 之间。
还拿上面例子来说:统计相同单词在所有输入数据中出现的次数,一个 Map 只能处理一部分数据,而热点单词就很可能会出现在所有 Map 中了,意味着同一单词必须要合并到一起统计才能得到正确结果。这种数据关联几乎在所有的大数据计算场景都需要处理,如果是例子这种的当然只对 Key 合并就OK了,但类似数据库 join 操作这种较复杂的,就需对两种类型(或更多)的数据依据 Key 关联。
这个数据关联操作在 MapReduce中的叫做:shuffle。
2.4 shuffle
shuffle 从字面意思来看,洗牌。下面是一个完整的MR过程,看一看如何洗牌。
先看左半边
1. 从 HDFS 中读取数据,输入数据块到一个个的 map,其中 map 完成计算时,计算结果会存储到本地文件系统。而当 map 快要进行完时,就会启动 shuffle 过程。
2. 如图,shuffle 也可分为两种,在Map端的是 Map shuffle。大致过程为:Map 任务进程会调用一个 Partitioner 接口,对 Map 产生的每个
这里就实现了对 Map 结果的分区、排序、分割,以及将同一分区的输出合并写入磁盘,得到一个分区有序的文件。这样不管 Map 在哪个服务器节点,相同的 Key 一定会被发送给相同 Reduce 进程。Reduce 进程对收到的
再看右半边
1. Reduce shuffle,又可分为复制 Map 输出、排序合并两阶段。
- Copy:Reduce 任务从各个 Map 任务拖取数据后,通知父 TaskTracker 状态已更新,TaskTracker 通知 JobTracker。Reduce 会定期向JobTracker 获取 Map 的输出位置,一旦拿到位置,Reduce 任务会从此输出对应的 TaskTracker 上复制输出到本地,不会等到所有的Map任务结束。
- Merge sort:
- Copy 的数据先放入内存缓冲区,若缓冲区放得下就把数据写入内存,即内存到内存 merge。
- Reduce 向每个 Map 去拖取数据,内存中每个 Map 对应一块数据,当内存缓存区中存储的数据达到一定程度,开启内存中 merge,把内存中数据merge 输出到磁盘文件中,即内存到磁盘 merge。
- 当属于该 reduce 的 map 输出全部拷贝完成,会在 reduce 上生成多个文件,执行合并操作,即磁盘到磁盘 merge。此刻 Map 的输出数据已经是有序的,Merge 进行一次合并排序,所谓 Reduce 端的 sort 过程就是这个合并的过程。
2. 经过上一步Reduce shuffle后,reduce进行最后的计算,将输出写入HDFS中。
以上便是 shuffle 大致四个步骤,关键是 map 输出的 shuffle 到哪个 Reduce 进程,它由 Partitioner 来实现,MapReduce 框架默认的 Partitioner 用 Key 哈希值对 Reduce 任务数量取模,相同 Key 会落在相同的 Reduce 任务 ID 上。
- public int getPartition(K2 key, V2 value, int numReduceTasks) {
- return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
- }
如果对 Shuffle 总结一句话: 分布式计算将不同服务器中的数据合并到一起进行后续计算的过程。
shuffle 是大数据计算过程中神奇的地方,不管是 MapReduce 还是 Spark,只要是大数据批处理计算,一定会有 shuffle 过程,只有让数据关联起来,它的内在关系和价值才会呈现。
3. Hive
上一部分介绍了 MapReduce,接下来简单谈谈 Hive .
我觉得任何一项技术的出现都是为了解决某类问题, MapReduce 毫无疑问简化了大数据开发的编程难度。但实际上进行数据计算更常用的手段可能是 SQL,那么有没有办法直接运行 SQL ?
3.1 Hive是什么
基于Hadoop的一个数据仓库系统,定义了一种类SQL查询语言:Hive SQL。
这里有一个名词 数据仓库,数据仓库是指:面向主题(Subject Oriented)、集成(Integrated)、相对稳定(Non-Volatile)、反应历史变化(Time Variant)的数据集合,用于支持管理决策。
这么说可能有点抽象,分解一下:
- 主题:数据仓库针对某个主题来进行组织,指使用数据仓库决策时所关心的重点方面。比如订阅分析就可以当做一个主题。
- 集成:数据仓库要将多个数据源数据存到一起,但数据以前的存储方式不同,要经过抽取、清洗、转换。(也就是 ETL)
- 稳定:保存的数据是一系列历史快照,不允许修改,只能分析。
- 时变:会定期接收到新的数据,反应出新的数据变化。
现在再看下定义:数据仓库是将多个数据源的数据按照一定的主题集成,进行抽取、清洗、转换。且处理整合后的数据不允许随意修改,只能分析,还需定期更新。
3.2 为什么是 Hive
了解了 Hive 的基础定义,想一下:一个依赖于 HDFS 的数据仓库在 Hadoop 环境中可以扮演什么角色?
前面说到,可不可以让 SQL 直接运行在 Hadoop 平台,这里的答案便是 Hive。它可以将 Hive SQL 转换为 MapReduce 程序运行。
Hive 初期版本默认 Hive on Mapreduce
启动 hive 前通常要先启动 hdfs 和 yarn, 同时一般需要配置 MySQL,Hive 依赖于 HDFS 的数据存储,但为了能操作 HDFS 上的数据集,要知道数据切分格式、存储类型、地址等。这些信息通过一张表存储,称为元数据,可以存储到 MySQL 中。
- 现在来看下 Hive 的部分命令
- 新建数据库:create database xxx;
- 删除数据库:drop database xxx;
- 建表:
- create table table_name(col_name data_type);
- Hive 的表有两个概念:**内部表和外部表**。默认内部表,简单来说,内部表数据存储在每个表相应的HDFS目录下。外部表的数据存在别处,要删除这个外部表,该外部表所指向的数据是不会被删除的,只会删除外部表对应的元数据。
- 查询:
- select * from t_table **where** a<100 **and** b>1000;
- 连接查询:
- select a.*,b.* from t_a a join t_b b on a.name=b.name;
看到这里,可能会觉得我在写 SQL, 没错,对于熟悉 SQL 的人来说,Hive 是非常易于上手的。
3.3 HIVE SQL To MapReduce
前面说到 HQL 可以‘转换’为 MapReduce, 下面就来看看:一个 HQL 是如何转化为 MapReduce 的Hive的基础架构:
通过 Client 向 Hive 提交 SQL 命令。如果是 DDL,Hive 就会通过执行引擎 Driver 将数据表的信息记录在 Metastore 元数据组件中,这个组件通常用一个关系数据库实现,记录表名、字段名、字段类型、关联 HDFS 文件路径等 Meta 信息(元信息)。
如果是DQL,Driver 就会将该语句提交给自己的编译器 进行语法分析、解析、优化等一系列操作,最后生成一个 MapReduce 执行计划。再根据执行计划生成一个 MapReduce 的作业,提交给 Hadoop 的 MapReduce 计算框架处理。
比如输入一条 select xxx from a ; 其执行顺序为:首先在 metastore 查询--> sql 解析--> 查询优化---> 物理计划--> 执行 MapReduce。
小结