深度思考Spark Runtime机制
序言
在以往的博客一天一个函数系列中,我们集中精力攻克了一座又一座的spark中那些晦涩的语法,收获了不少知识。如果以战喻,那就是我们的游击战取得了局部的显著效果。但是要想撼动整个spark-streaming大厦,还需要对整个运行时的机制有一个深入性的了解,知道所谓的接口应用,调用都是一些工程性封装好的东西,这些只要经过熟练的训练即可,但是想要更好的运用,就必须从底层了解它的机制。
在深入理解运行时机制之前,我建议大家思考一下这个问题,为什么spark采用的计算模型是以宽依赖为分界线的DAG模式?在我看来有以下几个优势在里面,第一点是按照不同的stage划分,使得计算更加高效,窄依赖诸如x平方,可以直接以pipe的形式进行操作,而宽依赖诸如(x+y)的平方,它需要等父依赖就绪后才能计算,所以这样的划分能够使计算速率最大化。第二点是,由于分布式集群的不可控性,我们的数据可能要随时落盘,这样一来由于我们采用的是stage模式的处理,我们就可以将中间结果保存在磁盘上,而不会发生由于宕机使得数据全部丢失的情况。
基于以上概念,我们来谈谈一些专业术语以及spark运行时的架构。
Terminologies
SparkContext
SparkContext是整个Spark Application的核心部分。它建立了与Spark Execution environment的连接,同时它还用来创建Spark RDDs,accumulators,以及broadcast variables,此外它还承担着获取spark服务以及运行job的责任。SparkContext is a client of Spark execution environment and acts as the master of Spark application.上面这句话道出了sparkcontext的精髓,下面我再列举一下Spark Context的主要作用:
- Getting the current status of spark application
- Canceling the job
- Canceling the Stage
- Running job synchronously
- Running job asynchronously
- Accessing persistent RDD
- Unpersisting RDD
- Programmable dynamic allocation
如果想进一步了解sparkcontext,请阅读《深入理解spark context》
Spark Shell
很吃惊吧,spark shell也是一个用scala写成的spark application,它提供了一个命令行式的环境,能够帮助我们更好的了解spark的特征,同时也能帮助我们更好的构建我们自己的spark application程序。
Spark Application
spark application是一个完备的计算框架,它能够运行用户提交的程序。即使当它没有运行job的时候,也会运行自己的进程。
Task
task是被提交到executor上的work单元,每一个stage都有一些task,一个task对应一个partition。
The Same task is done over different partitions of RDD。
Job
Job是由一系列tasks组成的并行计算任务,它是由action类型的函数激活的,换言之没有action,job不会被提交。
Stages
每一个job都被划分为几个小的set,这些set被称之为stages,它们之间是相互依赖的。关于stages是如何划分的,要参考spark中RDD的宽窄依赖理论,也就是文章一开头我提到的例子。最终的计算结果要等所有的stage都计算好了以后才能够出结果。
Architecture
spark使用的是主从结构(master-slave),即一个核心的coordinator和多个workers,亦被称为executors,每一个executor是一个单独的java进程。换句话说一个spark application是一个driver和它所有executors的集合,通过cluster manager的帮助,能够将其运行在一系列集群上。Standalone Cluster Manager是spark默认的内置cluster manager。除此之外,Spark还可以运行在一些开源的cluster manager上面,譬如Yarn和Mesos等等。
The Apache Spark Driver
driver是整个spark中的核心协调者,当一个action方法被调用的时候,driver程序中的sparkcontext会创建一个job,然后将其提交到DAG Scheduler (DAG Scheduler创建operator graph并将Job提交到task Scheduler)。 Task Scheduler通过cluster manager启动task。这样,通过cluster manager的帮助,一个完整的spark程序就在集群中启动起来了。
从程序的角度上分析,main方法是运行在driver中的,driver运行用户提交的代码,生成RDD,执行transformation和action函数,同时创建SparkContext。这些都是一个driver所承担的责任。当一个driver停止工作的时候,整个application便结束它的生命周期。
driver的两个最重要的角色是:
- Converting user program into the task.
- Scheduling task on the executor.
从一个更高的层面来看spark,RDD从input数据源生成,通过一系列transformation函数获得新的RDD,然后执行action函数。在spark程序中,操作的DAG是被隐式创建的,当driver运行的时候,它将DAG转化为物理层面的执行程序。
Apache Spark Cluster Manager
在某些情况下,spark会依赖cluster manager来启动executors,甚至driver也是通过其启动的。它是spark中的一个插件。在cluster manager上,spark application中的jobs和action是由Spark Scheduler按照FIFO的顺序安排的,spark application占用的资源能够依据workload大小动态的调节,无论在哪种集群模式的情况下都适合。
Apache Spark Executors
The individual task in the given Spark job runs in the Spark executors. Executors are launched once in the beginning of Spark Application and then they run for the entire lifetime of an application. Even if the Spark executor fails, the Spark application can continue with ease. There are two main roles of the executors:
- Runs the task that makes up the application and returns the result to the driver.
- Provide in-memory storage for RDDs that are cached by the user.
How to launch a Program in Spark
不论我们使用哪种cluster manager,spark给我们提供了一个简单的脚本,叫做spark-submit,用以方便我们提交我们的程序。它在集群上启动application,同时能够连接到不同的cluster manager并且能够控制我们程序所需要的资源。
How to Run Apache Spark Application on a cluster