spark 不同执行方式的运行期分析
结论 我还是学学
yarn-server 单个作业
yarn-clinet 的吧 交互
之前的mr 模型
1\ client 会执行inputformat 的getsplit ,write 成文件,然后提交job 包含资源(jar ,conf,..)
2\ yarn 的RM 接手,分配AM
3\ AM 接受,找RM 要资源 找NODENAMAGER 要 container 搞起。。。。。
spark 1.1 版本中 都是怎么样的
submit -->分析rdd -> 生成DAG--》 master 上运行--》调度资源运行
./bin/spark-submit 干了啥
这里master slave 是怎么协调的....................................
local
本地模式 使用N个线程
standalone
Standalone Deploy模式,需要部署Spark到相关节点
Mesos:// :Mesos模式,需要部署Spark和Mesos到相关节点
yarn-serviver SparkContext和任务都运行在Yarn集群中
yarn-client SparkConext运行在本地,task运行在Yarn集群中
大致工作流程
总体上来说,这些运行模式都基于一个相似的工作流程,SparkContext作为调度的总入口,在初始化过程中会分别创建DAGScheduler作业调度和TaskScheduler任务调度两极调度模块
作业调度模块是基于Stage的高层调度模块,它为每个Spark Job计算具有依赖关系的多个Stage任务阶段(通常根据Shuffle来划分Stage),然后将每个Stage划分为具体的一组任务(通常会考虑数据的本地性等)以Task Sets的形式提交给底层的任务调度模块来具体执行
任务调度模块负责具体启动任务,监控和汇报任务运行情况
不同运行模式的主要区别就在于他们各自实现了自己特定的任务调度模块,用来实际执行计算任务
Local本地模式使用 LocalBackend 配合TaskSchedulerImpl
LocalBackend 响应Scheduler的receiveOffers请求,根据可用CPU Core的设定值[N]直接生成WorkerOffer资源返回给Scheduler,并通过Executor类在线程池中依次启动和运行Scheduler返回的任务列表
Spark Standalone Deploy
Standalone模式使用SparkDeploySchedulerBackend配合TaskSchedulerImpl ,而SparkDeploySchedulerBackend本身拓展自CoarseGrainedSchedulerBackend
CoarseGrainedSchedulerBackend是一个基于Akka Actor实现的粗粒度的资源调度类,在整个SparkJob运行期间,CoarseGrainedSchedulerBackend会监听并持有注册给它的Executor资源(相对于细粒度的调度,Executor基于每个任务的生命周期创建和销毁),并且在接受Executor注册,状态更新,响应Scheduler请求等各种时刻,根据现有Executor资源发起任务调度流程
Executor本身通过各种途径启动,在Spark Standalone模式中,SparkDeploySchedulerBackend通过Client类向Spark Master 发送请求在独立部署的Spark集群中启动CoarseGrainedExecutorBackend,根据所需的CPU资源Core的数量,一个或多个CoarseGrainedExecutorBackend在Spark Worker节点上启动并注册给CoarseGrainedSchedulerBackend的DriverActor
完成所需Actor的启动之后,之后的任务调度就在CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend的Actor之间直接完成
Local-cluster
伪分布模式基于Standalone模式实现,实际就是在SparkContext初始化的过程中现在本地启动一个单机的伪分布Spark集群,之后的流程与Standalone模式相同
Mesos
Mesos模式根据调度的颗粒度,分别使用CoarseMesosSchedulerBackend和MesosSchedulerBackend配合TaskSchedulerImpl
粗粒度的CoarseMesosSchedulerBackend拓展自CoarseGrainedSchedulerBackend,相对于父类额外做的工作就是实现了MScheduler接口,注册到Mesos资源调度的框架中,用于接收Mesos的资源分配,在得到资源后通过Mesos框架远程启动CoarseGrainedExecutorBackend,之后的任务交互过程和Spark standalone模式一样,由DriverActor和Executor Actor直接完成
细粒度的MesosSchedulerBackend不使用CoarseMesosSchedulerBackend的基于Actor的调度模式,因此直接继承自SchedulerBackend,同样实现了MScheduler接口,注册到Mesos资源调度的框架中,用于接收Mesos的资源分配。不同的是在接收资源后,MesosSchedulerBackend启动的是基于Task任务的远程Executor,通过在远程执行 ./sbin/spark-executor命令来启动MesosExecutorBackend,在MesosExecutorBackend中直接launch对应的Task
Yarn-standalone
Yarn-Standalone模式相对其它模式有些特殊,需要由外部程序辅助启动APP。用户的应用程序通过org.apache.spark.deploy.yarn.Client启动
Client通过Yarn Client API在Hadoop集群上启动一个Spark ApplicationMaster,Spark ApplicationMaster首先注册自己为一个YarnApplication Master,之后启动用户程序,SparkContext在用户程序中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler,YarnClusterScheduler只是对TaskSchedulerImpl 的一个简单包装,增加对Executor的等待逻辑等。
然后根据Client传递过来的参数,SparkApplicationMaster通过Yarn RM/NM的接口在集群中启动若干个Container用于运行CoarseGrainedExecutorBackend往CoarseGrainedSchedulerBackend注册。之后的任务调度流程同上述其它Cluster模式
Yarn-client
Yarn-client模式中,SparkContext运行在本地,该模式适用于应用APP本身需要在本地进行交互的场合,比如Spark Shell,Shark等
Yarn-client模式下,SparkContext在初始化过程中启动YarnClientSchedulerBackend(同样拓展自CoarseGrainedSchedulerBackend),该Backend进一步调用org.apache.spark.deploy.yarn.Client在远程启动一个WorkerLauncher作为Spark的Application Master,相比Yarn-standalone模式,WorkerLauncher不再负责用户程序的启动(已经在客户端本地启动),而只是启动Container运行CoarseGrainedExecutorBackend与客户端本地的Driver进行通讯,后续任务调度流程相同
概括
总体而言,各种运行模式就是通过各种手段启动匹配的SchedulerBackend和ExecutorBackend。除了Local模式和细粒度的Mesos模式,其它模式最终都是通过基于Akka的CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend完成任务调度
Spark internal - 多样化的运行模式 (下)
作者:刘旭晖 Raymond 转载请注明出处
Email:colorant at 163.com
BLOG:http://blog.csdn.net/colorant/
上一篇中介绍了Spark的各种运行模式的基本流程和相关实现,这里主要分析一下各种运行模式中涉及到的一些细节问题的流程和实现
Spark的各种运行模式虽然启动方式,运行位置,调度手段有所不同,但它们所要完成的任务基本都是一致的,就是在合适的位置安全可靠的根据用户的配置和Job的需要管理和运行Task,这里粗略的列举一下在运行调度过程中各种需要考虑的问题
- 环境变量的传递
- Jar包和各种依赖文件的分发
- Task的管理和序列化等
- 用户参数配置
- 用户及权限控制
环境变量的传递
Spark的运行参数有很大一部分是通过环境变量来设置的,例如Executor的内存设置,Library路径等等。Local模式当然不存在环境变量的传递问题,在Cluster模式下,就需要将环境变量传递到远端JVM环境中去
SparkContext在初始化过程中 需要传递给Executor的环境变量,会在executorEnvs变量中(HashMap)中收集起来
而具体如何将这些变量设置到Executor的环境中,取决于Executor的Launch方式
在Spark Standalone模式中,这些变量被封装在org.apache.spark.deploy.Command中,交给AppClient启动远程Executor,Command经由Spark Master通过Actor再次转发给合适的Worker,Worker通过ExecutorRunner构建Java.lang.Process运行ExecutorBackend,环境变量在ExecutorRunner中传递给java.lang.ProcessBuilder.environment完成整个传递过程
在Mesos相关模式中,这些环境变量被设置到org.apache.mesos.Protos.Environment中,在通过MesosLaunch Task时交给Mesos完成分发工作
在yarn-standalone模式中,这些环境变量首先要通过Yarn Client 设置到Spark AM的运行环境中,基本就是Client类运行环境中以SPARK开头的环境变量全部设置到ContainerLaunchContext中,AM通过WorkerRunnable进一步将它们设置到运行Executor所用的ContainerLaunchContext中
Yarn-client模式与yarn-standalone模式大致相同,虽然SparkContext运行在本地,executor所需的环境变量还是通过ContainerLaunchContext经AM中转发给Executor
可以注意到,在Yarn相关模式中,并没有使用到SparkContext收集的executorEnvs,主要是因为Yarn Standalone模式下Sparkcontext本身就是在远程运行的,因此在Yarn Client中单独实现了相关代码
Jar包和各种依赖文件的分发
Spark程序的运行依赖大致分两类, 一是Spark runtime及其依赖,二是应用程序自身的额外依赖
对于Local模式而言,不存在Jar包分发的问题
对于第一类依赖
在Spark Standalone模式中,整个环境随Spark部署到各个节点中,因此也不存在runtime Jar包分发的问题
Mesos相关模式下,Mesos本身需要部署到各个节点,SparkRuntime可以和Standalone模式一样部署到各个节点中,也可以上传到Mesos可以读取的地方比如HDFS上,然后通过配置spark.executor.uri通知Mesos相关的SchedulerBackend,它们会将该URL传递给Mesos,Mesos在Launch任务时会从指定位置获取相关文件
而Spark 应用程序所额外依赖的文件,在上述模式中可以通过参数将URL传递给SparkContext,对于本地文件SparkContext将启动一个HttpServer用于其它节点读取相关文件,其它如HDFS和外部HTTP等地址上的文件则原封不动,然后这些额外依赖文件的URL在TaskSetmanager中和Task本身一起被序列化后发送给Executor,Executor再反序列化得到URL并传递给ExecutorURLClassLoader使用
在Yarn相关模式中,Runtime和程序运行所依赖的文件首先通过HDFS Client API上传到Job的.sparkStaging目录下,然后将对应的文件和URL映射关系通过containerLaunchContext.setLocalResources函数通知Yarn,Yarn的NodeManager在Launch container的时候会从指定URL处下载相关文件作为运行环境的一部分。上面的步骤对于Spark AM来说是充分的,而对于需要进一步分发到Executor的运行环境中的文件来说,AM还需要在创建Executor的Container的时候同样调用setLocalResources函数,AM是如何获得对应的文件和URL列表的呢,其实就是SparkYarn Client将这些文件的相关属性如URL,时间戳,尺寸等信息打包成字符串,通过特定的环境变量(SPARK_YARN_CACHE_XXX )传递给AM,AM再把它们从环境变量中还原成所需文件列表
Task管理和序列化
Task的运行要解决的问题不外乎就是如何以正确的顺序,有效地管理和分派任务,如何将Task及运行所需相关数据有效地发送到远端,以及收集运行结果
Task的派发源起于DAGScheduler调用TaskScheduler.submitTasks将一个Stage相关的一组Task一起提交调度。
在TaskSchedulerImpl中,这一组Task被交给一个新的TaskSetManager实例进行管理,所有的TaskSetManager经由SchedulableBuilder根据特定的调度策略进行排序,在TaskSchedulerImpl的resourceOffers函数中,当前被选择的TaskSetManager的ResourceOffer函数被调用并返回包含了序列化任务数据的TaskDescription,最后这些TaskDescription再由SchedulerBackend派发到ExecutorBackend去执行
系列化的过程中,上一节中所述App依赖文件相关属性URL等通过DataOutPutStream写出,而Task本身通过可配置的Serializer来序列化,当前可配制的Serializer包括如JavaSerializer ,KryoSerializer等
Task的运行结果在Executor端被序列化并发送回SchedulerBackend,由于受到Akka Frame Size尺寸的限制,如果运行结果数据过大,结果会存储到BlockManager中,这时候发送到SchedulerBackend的是对应数据的BlockID,TaskScheduler最终会调用TaskResultGetter在线程池中以异步的方式读取结果,TaskSetManager再根据运行结果更新任务状态(比如失败重试等)并汇报给DAGScheduler等
用户参数配置
Spark的用户参数配置途径很多,除了环境变量以外,可以通过Spark.conf文件设置,也可以通过修改系统属性设置 "spark.*"
而这些配置参数的使用环境也很多样化,有些在Sparkcontext本地使用(除了yarn-standalone模式),有些需要分发到Cluster集群中去
在SparkContext中解析和使用,比如spark.master,spark.app.names, spark.jars等等,通常用于配置SparkContext运行参数,创建Executor启动环境等
发送给Executor的参数又分两部分
一部分在ExecutorBackend初始化过程中需要使用的系统变量,会通过SparkContext在初始化过程中读取并设置到环境变量中去,在通过前面所述的方式,使用对应的底层资源调度系统设置到运行容器的环境变量中
另一部分在Executor中才使用的以"spark.*"开头的参数,则通过ExecutorBackend向SchedulerBackend的注册过程,在注册确认函数中传递给ExecutorBackend再在Executor的初始化过程中设置到SparkConf中
总体看来,这些参数配置的方式和分发途径有些不太统一,稍显混乱,大概还有改进的余地
用户及权限控制
Spark的Task在Executor中运行时,使用hadoop的UerGroupInfomation.doAs 函数将整个Task的运行环境包装起来以特定的sparkUser的身份运行。这样做的目的主要是使得Spark的task在与Hadoop交互时,使用特定的用户而不是Executor启动时所用的用户身份,这有利于在集群中区分Spark Cluster的运行用户和实际使用集群的APP用户身份,以及HDFS等权限控制
用户名在Executor中通过SPARK_USER环境变量获取
对于Local模式来说,SPARK_USER环境变量就是当前JVM环境下设定的值,当然对Local模式来说实际上也是不需要doAs的,Executor中如果SPARK_USER变量未设定或者与当前用户名一致,会跳过doAs直接执行task launch相关函数
传递用户身份的问题容易解决,比较麻烦的是身份的认证,例如将Spark运行在通过Kerberos管理权限的Hadoop集群中,这需要完成客户端的身份认证,Security 相关秘钥或Token的获取,分发,更新,失效等工作,在保证效率的同时,还要确保整个过程的安全性,目前的Spark代码对这一方面还没有完善的实现方案,但是有一些提案和Patch正在进行中。
Spark On Mesos模式。这是很多公司采用的模式,官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。目前在Spark On Mesos环境中,用户可选择两种调度模式之一运行自己的应用程序(可参考Andrew Xia的“Mesos Scheduling Mode on Spark”):
1) 粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。举个例子,比如你提交应用程序时,指定使用5个executor运行你的应用程序,每个executor占用5GB内存和5个CPU,每个executor内部设置了5个slot,则Mesos需要先为executor分配资源并启动它们,之后开始调度任务。另外,在程序运行过程中,mesos的master和slave并不知道executor内部各个task的运行情况,executor直接将任务状态通过内部的通信机制汇报给Driver,从一定程度上可以认为,每个应用程序利用mesos搭建了一个虚拟集群自己使用。
2) 细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。与粗粒度模式一样,应用程序启动时,先会启动executor,但每个executor占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos会为每个executor动态分配资源,每分配一些,便可以运行一个新任务,单个Task运行完之后可以马上释放对应的资源。每个Task会汇报状态给Mesos slave和Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于MapReduce调度模式,每个Task完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。