RDD原理与基本操作 | Spark,从入门到精通
欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:)
/ 什么是 RDD? /
传统的 MapReduce 虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是在迭代计算式的时候,要进行大量的磁盘 IO 操作,而 RDD 正是解决这一缺点的抽象方法。RDD(Resilient Distributed Datasets)即弹性分布式数据集,从名字说起:
弹性
当计算过程中内存不足时可刷写到磁盘等外存上,可与外存做灵活的数据交换;
RDD 使用了一种“血统”的容错机制,在结构更新和丢失后可随时根据血统进行数据模型的重建;
分布式
就是可以分布在多台机器上进行并行计算;
数据集
一组只读的、可分区的分布式数据集合,集合内包含了多个分区。分区依照特定规则将具有相同属性的数据记录放在一起,每个分区相当于一个数据集片段。
RDD 内部结构
图 1
图 1 所示是 RDD 的内部结构图,它是一个只读、有属性的数据集。它的属性用来描述当前数据集的状态,数据集由数据的分区(partition)组成,并由(block)映射成真实数据。RDD 的主要属性可以分为 3 类:与其他 RDD 的关系(parents、dependencies);数据(partitioner、checkpoint、storage level、iterator 等);RDD 自身属性(sparkcontext、sparkconf),接下来我们根据属性分类来深入介绍各个组件。
RDD 自身属性
从自身属性说起,SparkContext 是 Spark job 的入口,由 Driver 创建在 client 端,包括集群连接、RDD ID、累加器、广播变量等信息。SparkConf 是参数配置信息,包括:
- Spark api,控制大部分的应用程序参数;
- 环境变量,配置IP地址、端口等信息;
- 日志配置,通过 log4j.properties 配置。
数据
RDD 内部的数据集合在逻辑上和物理上被划分成多个小子集合,这样的每一个子集合我们将其称为分区(Partitions),分区的个数会决定并行计算的粒度,而每一个分区数值的计算都是在一个单独的任务中进行的,因此并行任务的个数也是由 RDD分区的个数决定的。但事实上 RDD 只是数据集的抽象,分区内部并不会存储具体的数据。Partition 类内包含一个 index 成员,表示该分区在 RDD 内的编号,通过 RDD 编号+分区编号可以确定该分区对应的唯一块编号,再利用底层数据存储层提供的接口就能从存储介质(如:HDFS、Memory)中提取出分区对应的数据。
RDD 的分区方式主要包含两种:Hash Partitioner 和 Range Partitioner,这两种分区类型都是针对 Key-Value 类型的数据,如是非 Key-Value 类型则分区函数为 None。Hash 是以 Key 作为分区条件的散列分布,分区数据不连续,极端情况也可能散列到少数几个分区上导致数据不均等;Range 按 Key 的排序平衡分布,分区内数据连续,大小也相对均等。
Preferred Location 是一个列表,用于存储每个 Partition 的优先位置。对于每个 HDFS 文件来说,这个列表保存的是每个 Partition 所在的块的位置,也就是该文件的「划分点」。
Storage Level 是 RDD 持久化的存储级别,RDD 持久化可以调用两种方法:cache 和 persist:persist 方法可以自由的设置存储级别,默认是持久化到内存;cache 方法是将 RDD 持久化到内存,cache 的内部实际上是调用了persist 方法,由于没有开放存储级别的参数设置,所以是直接持久化到内存。
图 2
如图 2 所示是 Storage Level 各级别分布,那么如何选择一种最合适的持久化策略呢?默认情况下,性能最高的当然是 MEMORY_ONLY,但前提是你的内存必须足够大到可以绰绰有余地存放下整个 RDD 的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果 RDD 中数据比较多时(比如几十亿),直接用这种持久化级别,会导致 JVM 的 OOM 内存溢出异常。
如果使用 MEMORY_ONLY 级别时发生了内存溢出,那么建议尝试使用 MEMORY_ONLY_SER 级别。该级别会将 RDD 数据序列化后再保存在内存中,此时每个 partition 仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比 MEMORY_ONLY 多出来的性能开销主要就是序列化与反序列化的开销,但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。但可能发生 OOM 内存溢出的异常。
如果纯内存的级别都无法使用,那么建议使用 MEMORY_AND_DISK_SER 策略,而不是 MEMORY_AND_DISK 策略。因为既然到了这一步,就说明 RDD 的数据量很大,内存无法完全放下,序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。
通常不建议使用 DISK_ONLY 和后缀为_2 的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销。
Checkpoint 是 Spark 提供的一种缓存机制,当需要计算依赖链非常长又想避免重新计算之前的 RDD 时,可以对 RDD 做 Checkpoint 处理,检查 RDD 是否被物化或计算,并将结果持久化到磁盘或 HDFS 内。Checkpoint 会把当前 RDD 保存到一个目录,要触发 action 操作的时候它才会执行。在 Checkpoint 应该先做持久化(persist 或者 cache)操作,否则就要重新计算一遍。若某个 RDD 成功执行 checkpoint,它前面的所有依赖链会被销毁。
与 Spark 提供的另一种缓存机制 cache 相比:cache 缓存数据由 executor 管理,若 executor 消失,它的数据将被清除,RDD 需要重新计算;而 checkpoint 将数据保存到磁盘或 HDFS 内,job 可以从 checkpoint 点继续计算。Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 这样的方法,相当于 cache 到磁盘上,这样可以使 RDD 第一次被计算得到时就存储到磁盘上,它们之间的区别在于:persist 虽然可以将 RDD 的 partition 持久化到磁盘,但一旦作业执行结束,被 cache 到磁盘上的 RDD 会被清空;而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的。
Compute 函数实现方式就是向上递归「获取父 RDD 分区数据进行计算」,直到遇到检查点 RDD 获取有缓存的 RDD。
Iterator 用来查找当前 RDD Partition 与父 RDD 中 Partition 的血缘关系,并通过 Storage Level 确定迭代位置,直到确定真实数据的位置。它的实现流程如下:
- 若标记了有缓存,则取缓存,取不到则进行 computeOrReadCheckpoint(计算或读检查点)。完了再存入缓存,以备后续使用。
- 若未标记有缓存,则直接进行 computeOrReadCheckpoint。
- computeOrReadCheckpoint 这个过程也做两个判断:有做过 checkpoint 和没有做过 checkpoint,做过 checkpoint 则可以读取到检查点数据返回,没做过则调该 RDD 的实现类的 compute 函数计算。
血统关系
一个作业从开始到结束的计算过程中产生了多个 RDD,RDD 之间是彼此相互依赖的,我们把这种父子依赖的关系称之为「血统」。
RDD 只支持粗颗粒变换,即只记录单个块(分区)上执行的单个操作,然后创建某个 RDD 的变换序列(血统 lineage)存储下来。
*变换序列指每个 RDD 都包含了它是如何由其他 RDD 变换过来的以及如何重建某一块数据的信息。
因此 RDD 的容错机制又称「血统」容错。 要实现这种「血统」容错机制,最大的难题就是如何表达父 RDD 和子 RDD 之间的依赖关系。
图 3
如图 3 所示,父 RDD 的每个分区最多只能被子 RDD 的一个分区使用,称为窄依赖(narrow dependency);若父 RDD 的每个分区可以被子 RDD 的多个分区使用,称为宽依赖(wide dependency)。简单来讲,窄依赖就是父子RDD分区间「一对一」的关系,而宽依赖就是「一对多」关系。从失败恢复来看,窄依赖的失败恢复起来更高效,因为它只需找到父 RDD 的一个对应分区即可,而且可以在不同节点上并行计算做恢复;宽依赖牵涉到父 RDD 的多个分区,需要得到所有依赖的父 RDD 分区的 shuffle 结果,恢复起来相对复杂些。
图 4
根据 RDD 之间的宽窄依赖关系引申出 Stage 的概念,Stage 是由一组 RDD 组成的执行计划。如果 RDD 的衍生关系都是窄依赖,则可放在同一个 Stage 中运行,若 RDD 的依赖关系为宽依赖,则要划分到不同的 Stage。这样 Spark 在执行作业时,会按照 Stage 的划分, 生成一个最优、完整的执行计划。
/ RDD 的创建方式与分区机制 /
RDD 的创建方式
RDD 的创建方式有四种:
1.使用程序中的集合创建 RDD,RDD 的数据源是程序中的集合,通过 parallelize 或者 makeRDD 将集合转化为 RDD;
*例
val num = Array(1,2,3,4,5) val rdd = sc.parallelize(num)
2.使用本地文件或 HDFS 创建 RDD,RDD 的数据源是本地文件系统或 HDFS 的数据,使用 textFile 方法创建RDD。
*例
val rdd = sc.textFile(“hdfs://master:9000/rec/data”)
3.使用数据流创建 RDD,使用 Spark Streaming 的相关类,接收实时的输入数据流创建 RDD(数据流来源可以是 kafka、flume 等)。
*例
val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream(“localhost”, 9999) val words = lines.flatMap(_.split(“ ”))
4.使用其他方式创建 RDD,从其他数据库上创建 RDD,例如 Hbase、MySQL 等。
*例
val sqlContext = new SQLContext(sc) val url = "jdbc:mysql://ip:port/xxxx" val prop = new Properties() val df = sqlContext.read.jdbc(url, “play_time”, prop)
RDD 的分区机制
RDD 的分区机制有两个关键点:一个是关键参数,即 Spark 的默认并发数 spark.default.parallelism;另一个是关键原则,RDD 分区尽可能使得分区的个数等于集群核心数目。
当配置文件 spark-default.conf 中显式配置了 spark.default.parallelism,那么 spark.default.parallelism=配置的值,否则按照如下规则进行取值:
1.本地模式(不会启动 executor,由 SparkSubmit 进程生成指定数量的线程数来并发)
spark-shell spark.default.parallelism = 1spark-shell --master local[N] spark.default.parallelism = N (使用 N 个核)spark-shell --master local spark.default.parallelism = 12.伪集群模式(x 为本机上启动的 executor 数,y 为每个 executor 使用的 core 数,z 为每个 executor 使用的内存)
spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x * y3.Yarn、standalone 等模式
spark.default.parallelism = max(所有 executor 使用的 core 总数,2)4.Mesos
spark.default.parallelism = 8spark.context 会生成两个参数,由 spark.default.parallelism 推导出这两个参数的值:
sc.defaultParallelism = spark.default.parallelism sc.defaultMinPartitions = min(spark.default.parallelism, 2)
当 sc.defaultParallelism 和 sc.defaultMinPartitions 确认后,就可以推算 RDD 的分区数了。
- 以 parallelize 方法为例
val rdd = sc.parallelize(1 to 10)
如果使用 parallelize 方法时没指定分区数, RDD 的分区数 = sc.defaultParallelism
- 以 textFile 方法为例
val rdd = sc.textFile(“path/file”)
分区机制分两种情况:
1.从本地文件生成的 RDD,如果没有指定分区数,则默认分区数规则为
rdd 的分区数 = max(本地 file 的分片数, sc.defaultMinPartitions)2.从 HDFS 生成的 RDD,如果没有指定分区数,则默认分区数规则为:
rdd 的分区数 = max(hdfs 文件的 block 数目, sc.defaultMinPartitions)/ RDD 的常用操作 /
RDD 支持两种类型的操作:转换(Transformation)和动作(Action),转换操作是从已经存在的数据集中创建一个新的数据集,而动作操作是在数据集上进行计算后返回结果到 Driver,既触发 SparkContext 提交 Job 作业。转换操作都具有 Lazy 特性,即 Spark 不会立刻进行实际的计算,只会记录执行的轨迹,只有触发行动操作的时候,它才会根据 DAG 图真正执行。
转换与动作具体包含的操作种类如下图所示:
图 5:转换操作
图 6:动作操作
最后我们通过一段代码来看看它具体的操作:
这段代码是用来计算某个视频被男性或女性用户的播放次数,其中 rdd_attr 用来记录用户性别,rdd_src 是用户对某个视频进行播放的记录,这两个 RDD 会进行一个 join 操作,比如这是某个男性用户对某个视频进行了播放,进行 map 操作之后得到视频 id 和性别作为 key,根据这个 key 做 reduceByKey 的操作,最终得到一个视频被男性/女性用户总共播放了多少次的 RDD,然后使用 combineByKey 合并同一个视频 id 的多个结果,最后保存到 HDFS 上。
附:参考文章
《Spark之深入理解RDD结构》https://blog.csdn.net/u011094454/article/details/78992293《RDD的数据结构模型》https://www.jianshu.com/p/dd7c7243e7f9?from=singlemessage《Spark RDD详解》https://blog.csdn.net/wangxiaotongfan/article/details/51395769《Spark RDD的默认分区数:(spark 2.1.0)》https://www.jianshu.com/p/4b7d07e754fa《Spark性能优化指南——基础篇》https://tech.meituan.com/spark_tuning_basic.html