Spark集群调度和开发环境的搭建
概述
在日常的工作中,不管是生产环境还是实验环境,我们运行Spark任务都是基于Spark集群环境,虽然有时候我们可以在本地使用Maven来搭建spark的开发环境来做一些测试,来完成代码的编写。(上家公司是一个例外,在本地IDEA写完程序,就可以直接连接到大数据平台HDP直接运行)但是正规的场景下,办公网络和集群网络是隔离的,所以我们编写的spark任务,都要依赖于各种数据源,e.g.HDFS、Kafka、RDBMS,所以都要打成Jar包放到集群环境来运行,将数据写到设计好的目的地。
What is a Spark cluster ?
Spark cluster is just some computers running Spark and working together.
cluster consist on:
Master: is one of the computers that orchestrate(安排协调) how everything works. It distributes the work and take care of everything.
Slaves: these are the computers that get the job done. They process chunks(大块的) of your massive (大量的)datasets following the Map Reduce paradigm(范式样式). A computer can be master and slave at the same time.
what does ‘standalone’ mean?
It just mean that Spark is installed in every computer involved in the cluster. The cluster manager in use is provided by Spark. (‘standalone’是spark自身内建的cluster manager(集群管理器 ))There are other cluster managers like Apache Mesos and Hadoop YARN.
![图片上传中...]
简单的介绍完Spark Cluster后,下面给出Spark 2.x版本的安装指南,为什么没有具体的写一遍安装教程呢,因为Spark集群环境的搭建相对于Hadoop的安装来说,简单的多,只要是按照步骤,一步一步的配置,就会成功。
安装指南:
https://www.davidadrian.cc/po...
Spark集群模式概述
Spark 应用在集群上作为独立的进程组来运行,在您的 main 程序中通过 SparkContext 来协调(称之为 driver 程序)。具体的说,为了运行在集群上,SparkContext 可以连接至几种类型的 Cluster Manager(既可以用 Spark 内置的 Standlone Cluster Manager,也可以使用外部的Mesos和 YARN),它们会分配应用的资源。一旦连接上,Spark 获得集群中节点上的 Executor,这些进程可以运行计算并且为您的应用存储数据。接下来,它将发送您的应用代码(通过 JAR 或者 Python 文件定义传递给 SparkContext)至 Executor。最终,SparkContext 将发送 Task 到 Executor 以运行。
这里有几个关于这个架构需要注意的地方 :
每个应用获取到它自己的 Executor 进程,它们会保持在整个应用的生命周期中并且在多个线程中运行 Task(任务)。这样做的优点是把应用互相隔离,在调度方面(每个 driver 调度它自己的 task)和 Executor 方面(来自不同应用的 task 运行在不同的 JVM 中)。然而,这也意味着若是不把数据写到外部的存储系统中的话,数据就不能够被不同的 Spark 应用(SparkContext 的实例)之间共享。Spark 是不知道底层的 Cluster Manager 到底是什么类型的。只要它能够获得 Executor 进程,并且它们可以和彼此之间通信,那么即使是在一个也支持其它应用的 Cluster Manager(例如,Mesos / YARN)上来运行它也是相对简单的。Driver 程序必须在自己的生命周期内(例如,请参阅 在网络配置章节中的 spark.driver.port 章节。 监听和接受来自它的 Executor 的连接请求。同样的,driver 程序必须可以从 worker 节点上网络寻址(就是网络没问题)。因为 driver 调度了集群上的 task(任务),更好的方式应该是在相同的局域网中靠近 worker 的节点上运行。如果您不喜欢发送请求到远程的集群,倒不如打开一个 RPC 至 driver 并让它就近提交操作而不是从很远的 worker 节点上运行一个 driver。
下面列出Spark中,有经常出现的几个名词
Application:基于Spark的用户程序,包含了一个driver program和集群中多个executor
Driver Program:运行Application的main()函数并创建SparkContext。通常SparkContext代表driver program
Executor:为某Application运行在worker node上的一个进程。该进程负责运行Task,并负责将数据存在内存或者磁盘上。每个Application都有自己独立的executors
Cluster Manager: 在集群上获得资源的外部服务(例如 Spark Standalon,Mesos、Yarn)
Worker Node: 集群中任何可运行Application代码的节点
Task:被送到executor上执行的工作单元。
Job:可以被拆分成Task并行计算的工作单元,一般由Spark Action触发的一次执行作业。
Stage:每个Job会被拆分成很多组Task,每组任务被称为stage,也可称TaskSet。该术语可以经常在日志中看打。
RDD:Spark的基本计算单元,通过Scala集合转化、读取数据集生成或者由其他RDD经过算子操作得到。
Cluster Manager(集群管理器) 类型
系统目前支持三种 Cluster Manager:
Standalone – 包含在 Spark 中使得它更容易来安装集群的一个简单的 Cluster Manager。
Apache Mesos – 一个通用的 Cluster Manager,它也可以运行 Hadoop MapReduce 和其它服务应用。
Hadoop YARN –Hadoop 2 中的 resource manager(资源管理器)。Kubernetes (experimental) – 除了上述之外,还有 Kubernetes 的实验支持。 Kubernetes 提供以容器为中心的基础设施的开源平台。 Kubernetes 的支持正在 apache-spark-on-k8s Github 组织中积极开发。有关文档,请参阅该项目的 README。
作业调度概述
Spark 有好几计算资源调度的方式. 首先,回忆一下 集群模式概述, 每个Spark 应用(包含一个SparkContext实例)中运行了一些其独占的执行器(executor)进程. 集群管理器提供了Spark 应用与应用之间的资源调度scheduling across applications.其次, 在各个Spark应用内部,各个线程可能并发地通过action算子提交多个Spark作业(job).如果你的应用服务于网络请求,那这种情况是很常见的.在Spark应用内部(对应同一个SparkContext)各个作业之间,Spark默认FIFO调度,同时也可以支持公平调度 fair scheduler.
1.跨应用作业调度
如果在集群上运行,每个Spark应用都会SparkContext获得一批独占的执行器JVM,来运行其任务并存储数据. 如果有多个用户共享集群,那么会有很多资源分配相关的选项,如何设计还取觉于具体的集群管理器.
对Spark所支持的各个集群管理器而言,最简单的的资源分配,就是静态划分.这种方式就意味着,每个Spark应用都是设定一个最大可用资源总量,并且该应用在整个生命周期内都会占住这个资源.这种方式在 Spark’s独立部署 standalone 和 YARN调度,以及Mesos粗粒度模式下都可用.coarse-grained Mesos mode .
资源分配可以根据集群类型配置如下:
Standalone mode: 默认情况下,Spark应用在独立部署的集群中都会以FIFO(first-in-first-out)模式顺序提交运行,并且每个spark应用都会占用集群中所有可用节点.不过你可以通过设置spark.cores.max或者spark.deploy.defaultCores来限制单个应用所占用的节点个数.最后,除了可以控制对CPU的使用数量之外,还可以通过spark.executor.memory来控制各个应用的内存占用量.
Mesos: 在Mesos中要使用静态划分的话,需要将spark.mesos.coarse设为true,同样,你也需要配置spark.cores.max来控制各个应用的CPU总数,以及spark.executor.memory来控制各个应用的内存占用.Mesos上还有一种动态共享CPU的方式。在这种模式下,每个Spark应用的内存占用仍然是固定且独占的(仍由spark.exexcutor.memory决定),但是如果该Spark应用没有在某个机器上执行任务的话,那么其它应用可以占用该机器上的CPU。这种模式对集群中有大量不是很活跃应用的场景非常有效,例如:集群中有很多不同用户的Spark shell session.但这种模式不适用于低延时的场景,因为当Spark应用需要使用CPU的时候,可能需要等待一段时间才能取得对CPU的使用权。要使用这种模式,只需要在mesos://URL上设置spark.mesos.coarse属性为false即可。
YARN: 在YARN中需要使用 –num-executors 选项来控制Spark应用在集群中分配的执行器的个数.对于单个执行器(executor)所占用的资源,可以使用 –executor-memory和–executor-cores来控制.
值得注意的是,目前还没有任何一种资源分配模式支持跨Spark应用的内存共享。如果你想通过这种方式共享数据,我们建议你可以单独使用一个服务(例如:alluxio),这样就能实现多应用访问同一个RDD的数据。
动态资源分配
Spark 提供了一种基于负载来动态调节Spark应用资源占用的机制。这意味着,你的应用会在资源空闲的时间将其释放给集群,需要时再重新申请。这一特性在多个应用Spark集群资源的情况下特别有用.
这个特性默认是禁止的,但是在所有的粗粒度集群管理器上都是可用的,如:i.e. 独立部署模式standalone mode, YARN mode, and 粗粒度模式Mesos coarse-grained mode.
资源分配策略
总体上来说,Spark应该在executors 空闲时将其关闭,而在后续要用时再申请。因为没有一个固定的方法,可以预测一个executors 在后续是否马上会被分配去执行任务,或者一个新分配的executors 实际上是空闲的,所以我们需要一个试探性的方法,来决定是否申请或是移除一个executors 。
请求策略
一个启用了动态分配的Spark应用会有等待任务需要调度的时候,申请额外的executors 。在这种情况下,必定意味着已有的executors 已经不足以同时执行所有未完成的任务。Spark会分轮次来申请executors 。实际的资源申请,会在任务挂起spark.dynamicAllocation.schedulerBacklogTimeout秒后首次触发,其后如果等待队列中仍有挂起的任务,则每过spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒后触发一次资源申请。另外,每一轮申请的executors 个数以指数形式增长。例如:一个Spark应用可能在首轮申请1个执行器,后续的轮次申请个数可能是2个、4个、8个….。
采用指数级增长策略的原因有两个:第一,对于任何一个Spark应用如果只需要多申请少数几个executors 的话,那么必须非常谨慎的启动资源申请,这和TCP慢启动有些类似;第二,如果一旦Spark应用确实需要申请多个执行器的话,那么可以确保其所需的计算资源及时增长。
移除策略
移除executors 的策略就简单得多了。Spark应用会在某个executors 空闲超过spark.dynamicAllocation.executorIdleTimeout秒后将其删除,在大多数情况下,executors 的移除条件和申请条件都是互斥的,也就是说,executors 在有等待执行任务挂起时,不应该空闲。
优雅的关闭Executor
非动态分配模式下,executor可能的退出原因有执行失败或是相关Spark应用已经退出。不管是哪种原因,执行器的所有状态都已经不再需要,可以丢弃掉。但是在动态分配的情况下,executor有可能在Spark应用运行期间被移除。这时候,如果Spark应用尝试去访问该executor存储的状态,就必须重算这一部分数据。因此,Spark需要一种机制,能够优雅的关闭executor,同时还保留其状态数据。
这种需求对于shuffle操作尤其重要。shuffle过程中,Spark 执行器首先将 map 输出写到本地磁盘,同时executor本身又是一个文件服务器,这样其他executor就能够通过该执行器获得对应的 map 结果数据。一旦有某些任务执行时间过长,动态分配有可能在shuffle结束前移除任务异常的执行器,而这些被移除的执行器对应的数据将会被重新计算,但这些重算其实是不必要的。
要解决这一问题,就需要用到 external shuffle service ,该服务在 Spark 1.2 引入。该服务在每个节点上都会启动一个不依赖于任何 Spark 应用或executor的独立进程。一旦该服务启用,Spark 执行器不再从各个执行器上获取 shuffle 文件,转而从这个 service 获取。这意味着,任何执行器输出的shuffle状态数据都可能存留时间比对应的执行器进程还长。
除了shuffle文件之外,executor也会在磁盘或者内存中缓存数。一旦executor被移除,其缓存数据将无法访问。这个问题目前还没有解决。或许在未来的版本中,可能会采用外部shuffle服务类似的方法,将缓存数据保存在堆外存储中以解决这一问题。
2.应用内调度
在指定的 Spark 应用内部(对应同一 SparkContext 实例),多个线程可能并发地提交 Spark 作业(job)。在本节中,作业(job)是指,由 Spark action 算子(如 : collect)触发的一系列计算任务的集合。Spark 调度器是完全线程安全的,并且能够支持 Spark 应用同时处理多个请求(比如 : 来自不同用户的查询)。
默认,Spark 应用内部使用 FIFO 调度策略。每个作业被划分为多个阶段(stage)(例如 : map 阶段和 reduce 阶段),第一个作业在其启动后会优先获取所有的可用资源,然后是第二个作业再申请,再第三个……。如果前面的作业没有把集群资源占满,则后续的作业可以立即启动运行,否则,后提交的作业会有明显的延迟等待。
不过从 Spark 0.8 开始,Spark 也能支持各个作业间的公平(Fair)调度。公平调度时,Spark 以轮询的方式给每个作业分配资源,因此所有的作业获得的资源大体上是平均分配。这意味着,即使有大作业在运行,小的作业再提交也能立即获得计算资源而不是等待前面的作业结束,大大减少了延迟时间。这种模式特别适合于多用户配置。 要启用公平调度器,只需设置一下 SparkContext 中 spark.scheduler.mode 属性为 FAIR 即可 :
val conf = new SparkConf().setMaster(...).setAppName(...) conf.set("spark.scheduler.mode", "FAIR") val sc = new SparkContext(conf)
以上就是Spark集群环境的作业调度和资源调度的介绍,更加详细的讲解,可以自行Google
开发环境搭建
Spark开发环境的搭建,我就拿我自己平常经常使用的IDEA+Maven来举例,当然现在很多人在用sbt,这只是个人习惯问题。
注: 这种开发环境不依赖与Spark集群环境,直接打开IDEA进行下面的操作就可以~~~~
1.打开IDEA,创建Maven项目
(临时掉链子,archetype刷新不出来,选择maven-archetype-quickstart就ok)
2.随后打开pom.xml文件,加入下面内容
<build> <defaultGoal>package</defaultGoal> <finalName>sparkprocess</finalName> <outputDirectory>target/classes</outputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/scala</directory> <excludes> <exclude>**/*.java</exclude> </excludes> </resource> <resource> <targetPath>config/</targetPath> <directory>${project.basedir}/src/main/resources</directory> <includes> <include>*.*</include> </includes> </resource> <resource> <directory>${project.basedir}/src/main/resources</directory> <includes> <include>*.*</include> </includes> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> <sourceDirectory>src/main/scala</sourceDirectory> </build>
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.0</version> </dependency> <!--如果Spark streaming对接Kafka,就要使用这个依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.0</version> </dependency> </dependencies>
以上就是要在pom.xml文件中加入的依赖
其中:
在项目工程中我们要引入JDK 1.8以上
如果我们使用scala来开发Spark,我们还要引入Scala环境。其中scala环境的选择有两种方式:
1.我们可以在window本机上,像安装java环境一样,安装scala,再在IDEA中引入。
2.在pom.xml文件中引入scala的依赖,本教程的方式就是第二种方式,引入了scala 2.11.8
在上面的依赖中,我们看到了3个spark的依赖,其中Spark Core是必须引入的,而其他则按照我们需要进行的spark任务需要用到的Spark组件来进行选择性的引入,具体的原因,我会在接下来每个Spark组件的分享中来介绍。
下一篇分享就会进入到Spark的编程实践了,会从Spark Core开始,开启Spark开发之路