Spark内核源码继续五:Master原理解析和源码解析

上篇已经降到AppClient找Master进行注册,本章主要解析Master的原理和源码解析

1、Master的主备切换原理

Spark内核源码继续五:Master原理解析和源码解析

package org.apache.spark.deploy.master  completeRecovery,过滤没有响应的worker,app,drivers,从内存缓存中移除,从组件缓存中移除,从持久化机制中移除。    workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker),过滤掉没有响应的Worker,并且移除没有响应的    apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication    // Reschedule drivers which were not claimed by any workers    drivers.filter(_.worker.isEmpty).foreach { d => }

2、注册机制原理和源码分析

1、worker会向master注册,过滤将状态为Dead过滤掉,对于UNKNOW的状态的设置为Dead,清理掉旧的Worker信息,替换成新的。将Worker信息加入内存缓存中,用持久化引擎将Worker信息进行持久化(文件系统或者zookeeper),最后调用schedule方法。

2、Driver会向master注册,将Driver信息放入内存缓存中,hashmap中,加入等待调度执行队列ArrayBuffer,调用持久化引擎将Driver信息进行持久化,最后调用schedule方法

3、用Spark-submit提交的Spark Application首先就是注册Driver,Driver启动好了,执行我们编写的Application代码,执行SparkContext初始化,底层的SparkDeploySchedulebackend,会通过AppClient内部的线程,ClientActor发送RegisterApplication,到Master进行Application的注册。将Application放入缓存,hashMap,将Application加入等待调度的Application队列ArrayBuffer,用持久化引擎将Application持久化。然后调用schedule方法。

Spark内核源码继续五:Master原理解析和源码解析

case RegisterApplication(description)  CreateApplication,创建一个Application对象,然后采用这个对象进行注册  registerApplication    //加入内存    waitingApps //加入等待调度队列,这是一个ArrayBuffer对象  persistenceEngine.addApplication(app),调用持久化引擎持久化application  sender ! RegisteredApplication(app.id, masterUrl),反向向SparkDeploySchedulerBackend的Appclient的ClientActor,发送消息也就是RedisterdApplication

3、状态改变原理和源码分析

def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) {
    // 找到对应的Driver
    drivers.find(d => d.id == driverId) match {
        // 找到后
      case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        // 将Driver从内存移除
        drivers -= driver
        if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
        // 向CompleteDrivers中加入Driver
        completedDrivers += driver

        // 使用持久化引擎移除Driver
        persistenceEngine.removeDriver(driver)
        // 设置Driver的状态和异常
        driver.state = finalState
        driver.exception = exception

        // 将driver所在的worker,移除Driver
        driver.worker.foreach(w => w.removeDriver(driver))
        // 调用schedule方法
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
      execOption match {
          // 如果有值
        case Some(exec) => {
          val appInfo = idToApp(appId)
          exec.state = state
          if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
          // 向Driver发送ExecutorUpdated消息
          exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
          if (ExecutorState.isFinished(state)) {
            // Remove this executor from the worker and app
            logInfo(s"Removing executor ${exec.fullId} because it is $state")
            // 从app中移除缓存worker
            appInfo.removeExecutor(exec)
            // 从运行的executor的worker中移除executor
            exec.worker.removeExecutor(exec)
            // 判断executor退出状态
            val normalExit = exitStatus == Some(0)
            // Only retry certain number of times so we don‘t go into an infinite loop.
            if (!normalExit) {
              // 判断重试次数,如果没有达到最大次数就重新调度
              if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
                schedule()
              } else {
                // 否则就移除application
                val execs = appInfo.executors.values
                if (!execs.exists(_.state == ExecutorState.RUNNING)) {
                  logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
                    s"${appInfo.retryCount} times; removing it")
                  removeApplication(appInfo, ApplicationState.FAILED)
                }
              }
            }
          }
        }
        case None =>
          logWarning(s"Got status update for unknown executor $appId/$execId")
      }
    }

4、资源调度原理和源码分析

也就是schedule方法

driver的调度机制

private def schedule() {
    // 判断master状态不是ALIVE的话直接返回
    if (state != RecoveryState.ALIVE) { return }

    // First schedule drivers, they take strict precedence over applications
    // Randomization helps balance drivers
    // 将传入的集合元素随机打乱,取出所有注册的Worker。将所有alive的worker随机打乱
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    // 拿到数量
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0

    // 首先调度driver,什么情况下会注册driver,并且会导致driver调度
    // 只有yarn-cluster,才会注册driver,standalone和yarn-client模式下,都会在本地直接创建启动driver,而不会注册driver
    // 更不可能调度driver。driver的调度机制,遍历waitingDrivers ArrayBuffer
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
      // 只要还有还活着的worker没有被遍历到,那么就继续遍历,当前还没有启动driver,就是launced为false
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        // 如果当前worker空闲内存和cpu满足driver的需求,
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          // 启动driver,并且将driver从waitingDrivers队列中移除
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        // 将指针移到下一个worker
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
// 在某个worker上启动driver
  def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    // driver加入内存的缓存结构,将worker内使用的内存和cpu数量加上driver所需要的数量
    worker.addDriver(driver)
    // 同时也将worker加入driver的缓存结构中
    driver.worker = Some(worker)
    // 然后调用worker的actor,给他发送LaunchDriver消息,让woker启动driver
    worker.actor ! LaunchDriver(driver.id, driver.desc)
    // 将driver的状态设置为running状态
    driver.state = DriverState.RUNNING
  }

application调度机制

// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    // 首先呢Application的调度算法分两种,一种是spreadOutApps,另外一种是非spreadOtuApps算法
    if (spreadOutApps) {
      // Try to spread out each app among all the nodes, until it has all its cores
      // 首先遍历waitingApps中的applicationInfo,过滤出要调度的applicationInfo
      for (app <- waitingApps if app.coresLeft > 0) {
        // 过滤出可以被application使用的worker,按照剩余内存倒序排列,也就是过滤出要能满足application的worker,并且没有启动过
        // app 对应的executor
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse
        val numUsable = usableWorkers.length
        // 创建一个空数组存储分配worker的cpu数量
        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
        // 获取到低要分配多少cpu,取app剩余要分配的cpu数量和work剩余cpu的最小值
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
        var pos = 0
        // 只要要分配的cpu还没有分配完就继续
        // 通过这种算法,其实会将这个application,要启动的executor都平均分布到各个worker上去
        // 比如有个20 cpu core要分配,实际会循环两遍worker,给每个worker分配一个1个core,最后每个worker分配2个core
        while (toAssign > 0) {
          // 每个worker如果空闲的cpu数量大于已经分配的cpu数量,worker还有可以分配的cpu
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
            // 将总共要分配的cpu数量-1,因为这里已经决定在这个worker上分配一个cpu了
            toAssign -= 1
            // 给这个worker分配的数量加1
            assigned(pos) += 1
          }
          //  指针下移动一个
          pos = (pos + 1) % numUsable
        }
        // Now that we‘ve decided how many cores to give on each node, let‘s actually give them
        // 给每个worker分配application要求的core后,遍历worker然后启动executor
        for (pos <- 0 until numUsable) {
          // 只要这个worker分配到core了
          if (assigned(pos) > 0) {
            // 首先application内部缓存结构中,添加executor,并且创建executor对象,其中封装了给这个executor分配多少个cpu
            // spark-submit脚本里面可以指定要多少个executor,每个executor多少个cpu,多少内存,基于我们的机制,实际上最后executor的数量
            // 以及executor的cpu的数量可以跟配置的不一样,因为我们是基于总的cpu数量来分配的,就是说要比如要3个executor,每个executor 3个cpu
            // 那么比如,有9给worker,每个有1个CPU,那么要分配9个core,每个worker分配一个core,每个worker启动一个executor,
            // 最后启动9个executor,每个executor有一个cpu core
            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
            // 那么就在worker上启动executor
            launchExecutor(usableWorkers(pos), exec)
            // 将app状态设置为Running
            app.state = ApplicationState.RUNNING
          }
        }
      }
    } else {
      // Pack each app into as few nodes as possible until we‘ve assigned all its cores
      // 非spradout算法,将一个application尽可能少的分配到worker上去,遍历worker,并且需要分配的core的application。
      // 总共有10个worker,每个10个core,app要分配20个core,只能分配到2个worker上每个worker10个core
      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
        // 遍历application,并且是还需要分配的application
        for (app <- waitingApps if app.coresLeft > 0) {
          // 判断worker是否可以被application使用
          if (canUse(app, worker)) {
            // 取worker剩余和app所需core的最小值
            val coresToUse = math.min(worker.coresFree, app.coresLeft)
            if (coresToUse > 0) {
              // 给app添加executor
              val exec = app.addExecutor(worker, coresToUse)
              launchExecutor(worker, exec)
              // 将app状态修改为running
              app.state = ApplicationState.RUNNING
            }
          }
        }
      }
    }
  }

相关推荐