阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Spark作业调度阶段分析

129次阅读
没有评论

共计 4028 个字符,预计需要花费 11 分钟才能阅读完成。

Spark 作为分布式的大数据处理框架必然或涉及到大量的作业调度,如果能够理解 Spark 中的调度对我们编写或优化 Spark 程序都是有很大帮助的;

在 Spark 中存在 转换操作(Transformation Operation)行动操作 (Action Operation) 两种;而转换操作只是会从一个 RDD 中生成另一个 RDD 且是 lazy 的,Spark 中只有 行动操作(Action Operation)才会触发作业的提交,从而引发作业调度;在一个计算任务中可能会多次调用 转换操作这些操作生成的 RDD 可能存在着依赖关系,而由于转换都是 lazy 所以当行动操作(Action Operation)触发时才会有真正的 RDD 生成,这一系列的 RDD 中就存在着依赖关系形成一个 DAG(Directed Acyclc Graph),在 Spark 中 DAGScheuler 是基于 DAG 的顶层调度模块;

相关名词

Application:使用 Spark 编写的应用程序,通常需要提交一个或多个作业;
Job:在触发 RDD Action 操作时产生的计算作业
Task: 一个分区数据集中最小处理单元也就是真正执行作业的地方
TaskSet: 由多个 Task 所组成没有 Shuffle 依赖关系的任务集
Stage: 一个任务集对应的调度阶段,每个 Job 会被拆分成诺干个 Stage

Spark 作业调度阶段分析
1.1 作业调度关系图

RDD Action 作业提交流程

这里根据 Spark 源码跟踪触发 Action 操作时触发的 Job 提交流程,Count()是 RDD 中的一个 Action 操作所以调用 Count 时会触发 Job 提交;
在 RDD 源码 count()调用 SparkContext 的 runJob,在 runJob 方法中根据 partitions(分区)大小创建 Arrays 存放返回结果;

RDD.scala

/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

SparkContext.scala

def runJob[T, U: ClassTag](rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  resultHandler: (Int, U) => Unit): Unit = {

  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job:" + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
}

在 SparkContext 中将调用 DAGScheduler 的 runJob 方法提交作业,DAGScheduler 主要任务是计算作业与任务依赖关系,处理调用逻辑;DAGScheduler 提供了 submitJob 与 runJob 方法用于 提交作业,runJob 方法会一直等待作业完成,submitJob 则返回 JobWaiter 对象可以用于判断作业执行结果;
在 runJob 方法中将调用 submitJob,在 submitJob 中把提交操作放入到事件循环队列(DAGSchedulerEventProcessLoop)中;

def submitJob[T, U](  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  callSite: CallSite,
  resultHandler: (Int, U) => Unit,
  properties: Properties): JobWaiter[U] = {
      ......  
      eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
      ......
  }  

在事件循环队列中将调用 eventprocessLoop 的 onReceive 方法;

Stage 拆分

提交作业时 DAGScheduler 会 从 RDD 依赖链尾部开始,遍历整个依赖链划分调度阶段;划分阶段以 ShuffleDependency 为依据,当没有 ShuffleDependency 时整个 Job 只会有一个 Stage;在事件循环队列中将会调用 DAGScheduler 的 handleJobSubmitted 方法,此方法会拆分 Stage、提交 Stage;

 private[scheduler] def handleJobSubmitted(jobId: Int,
  finalRDD: RDD[_],
  func: (TaskContext, Iterator[_]) => _,
  partitions: Array[Int],
  callSite: CallSite,
  listener: JobListener,
  properties: Properties) {var finalStage: ResultStage = null
......
  finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
......

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
......
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)

submitWaitingStages()}

调度阶段提交

在提交 Stage 时会先 调用 getMissingParentStages 获取父阶段 Stage,迭代该阶段所依赖的父调度阶段如果存在则先提交该父阶段的 Stage 当不存在父 Stage 或父 Stage 执行完成时会对当前 Stage 进行提交;

 private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {val missing = getMissingParentStages(stage).sortBy(_.id)
      if (missing.isEmpty) {submitMissingTasks(stage, jobId.get)
      } else {for (parent <- missing) {submitStage(parent)
        }
        waitingStages += stage
      }
    }
  }
  ......
}

参考资料:
http://spark.apache.org/docs/latest/

更多 Spark 相关教程见以下内容

CentOS 7.0 下安装并配置 Spark  http://www.linuxidc.com/Linux/2015-08/122284.htm

Spark1.0.0 部署指南 http://www.linuxidc.com/Linux/2014-07/104304.htm

CentOS 6.2(64 位)下安装 Spark0.8.0 详细记录 http://www.linuxidc.com/Linux/2014-06/102583.htm

Spark 简介及其在 Ubuntu 下的安装使用 http://www.linuxidc.com/Linux/2013-08/88606.htm

安装 Spark 集群(在 CentOS 上) http://www.linuxidc.com/Linux/2013-08/88599.htm

Hadoop vs Spark 性能对比 http://www.linuxidc.com/Linux/2013-08/88597.htm

Spark 安装与学习 http://www.linuxidc.com/Linux/2013-08/88596.htm

Spark 并行计算模型 http://www.linuxidc.com/Linux/2012-12/76490.htm

Ubuntu 14.04 LTS 安装 Spark 1.6.0(伪分布式)http://www.linuxidc.com/Linux/2016-03/129068.htm

Spark 的详细介绍:请点这里
Spark 的下载地址:请点这里

本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-03/129504.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计4028字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中