来源:腾讯技术工程微信号
作者:calvinrzluo丨腾讯 IEG 后台开发工程师本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正。为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容。
Spark Job 执行流程分析
Job 阶段
下面我们通过一个 RDD 上的 Action 操作 count,查看 Spark 的 Job 是如何运行和调度的。特别注意的是,在 SparkSQL 中,Action 操作有不同的执行流程,所以宜对比着看。count通过全局的SparkContext.runJob启动一个 Job,这个函数转而调用DAGScheduler.runJob。Utils.getIteratorSize实际上就是遍历一遍迭代器,以便统计 count。
// RDD.scaladef count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum// Utils.scaladef getIteratorSize(iterator: Iterator[_]): Long = { var count = 0L while (iterator.hasNext) { count += 1L iterator.next() } count}在参数列表里面的下划线_的作用是将方法转为函数,Scala 中方法和函数之间有一些区别,在此不详细讨论。
下面查看runJob函数。比较有趣的是clean函数,它调用ClosureCleaner.clean方法,这个方法用来清理$outer域中未被引用的变量。因为我们要将闭包func序列化,并从 Driver 发送到 Executor 上面。序列化闭包的过程就是为每一个闭包生成一个可序列化类,在生成时,会将这个闭包所引用的外部对象也序列化。容易发现,如果我们为了使用外部对象的某些字段,而序列化整个对象,那么开销是很大的,因此通过clean来清除不需要的部分以减少序列化开销。
此外,getCallSite用来生成诸如s"$lastSparkMethod at $firstUserFile:$firstUserLine"这样的字符串,它实际上会回溯调用栈,找到第一个不是在 Spark 包中的函数,即$lastSparkMethod,它是导致一个 RDD 创建的函数,比如各种 Transform 操作、sc.parallelize等。
// SparkContext.scaladef runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) // CheckPoint机制 rdd.doCheckpoint()}private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true "spark] def clean[F <: AnyRef"): F = { ClosureCleaner.clean(f, checkSerializable) f}我们发现,传入的 func 只接受一个Iterator[_]参数,但是其形参声明却是接受TaskContext和Iterator[T]两个参数。这是为什么呢?这是因为runJob有不少重载函数,例如下面的这个
def runJob[T, U: ClassTag]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U] = { val cleanedFunc = clean(func) runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)}下面我们查看DAGScheduler.runJob函数,它实际上就是调用submitJob,然后等待 Job 执行的结果。由于 Spark 的DAGScheduler是基于事件循环的,它拥有一个DAGSchedulerEventProcessLoop类型的变量eventProcessLoop,不同的对象向它post事件,然后在它的onReceive循环中会依次对这些事件调用处理函数。
我们需要注意的是partitions不同于我们传入的rdd.partitions,前者是一个Array[Int],后者是一个Array[Partition]。并且在逻辑意义上,前者表示需要计算的 partition,对于如 first 之类的 Action 操作来说,它只是 rdd 的所有 partition 的一个子集,我们将在稍后的submitMissingTasks函数中继续看到这一点。
def runJob[T, U](... "T, U"): Unit = { val start = System.nanoTime val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) // 下面就是在等了 ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception }}def submitJob[T, U]( rdd: RDD[T], // target RDD to run tasks on,就是被执行count的RDD func: (TaskContext, Iterator[T]) => U, // 在RDD每一个partition上需要跑的函数 partitions: Seq[Int], callSite: CallSite, // 被调用的位置 resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // 检查是否在一个不存在的分区上创建一个Task val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions)} // jobId是从后往前递增的 val jobId = nextJobId.getAndIncrement() if (partitions.isEmpty) { val time = clock.getTimeMillis() // listenerBus是一个LiveListenerBus对象,从DAGScheduler构造时得到,用来做event log // SparkListenerJobStart定义在SparkListener.scala文件中 listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo]( "StageInfo"), SerializationUtils.clone(properties))) listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded)) // 如果partitions是空的,那么就直接返回 return new JobWaiter[U](this, jobId, 0, resultHandler "U") } assert(partitions.nonEmpty) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler "U") // 我们向eventProcessLoop提交一个JobSubmitted事件 eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter}// DAGSchedulerEvent.scalaprivate[scheduler] case class JobSubmitted( jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties = null) extends DAGSchedulerEvent下面我们具体看看对JobSubmitted的响应
// DAGScheduler.scalaprivate[scheduler] def handleJobSubmitted(...) { var finalStage: ResultStage = null // 首先我们尝试创建一个`finalStage: ResultStage`,这是整个Job的最后一个Stage。 try { // func: (TaskContext, Iterator[_]) => _ // 下面的语句是可能抛BarrierJobSlotsNumberCheckFailed或者其他异常的, // 例如一个HadoopRDD所依赖的HDFS文件被删除了 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { ...// DAGScheduler.scalaprivate def createResultStage(...): ResultStage = { checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage}这里createResultStage所返回的ResultStage继承了Stage类。Stage类有个rdd参数,对ResultStage而言就是finalRDD,对ShuffleMapStage而言就是ShuffleDependency.rdd
// DAGScheduler.scaladef createShuffleMapStage[K, V, C]( shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd ...下面我们来看看checkBarrierStageWithNumSlots这个函数,因为它会抛出BarrierJobSlotsNumberCheckFailed这个异常,被handleJobSubmitted捕获。这个函数主要是为了检测是否有足够的 slots 去运行所有的 barrier task。屏障调度器是 Spark 为了支持深度学习在 2.4.0 版本所引入的一个特性。它要求在 barrier stage 中同时启动所有的 Task,当任意的 task 执行失败的时候,总是重启整个 barrier stage。这么麻烦是因为 Spark 希望能够在 Task 中提供一个 barrier 以供显式同步。
// DAGScheduler.scalaprivate def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = { val numPartitions = rdd.getNumPartitions val maxNumConcurrentTasks = sc.maxNumConcurrentTasks if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) { throw new BarrierJobSlotsNumberCheckFailed(numPartitions, maxNumConcurrentTasks) }}// DAGScheduler.scala ... case e: BarrierJobSlotsNumberCheckFailed => // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically. // barrierJobIdToNumTasksCheckFailures是一个ConcurrentHashMap,表示对每个BarrierJob上失败的Task数量 val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId, (_: Int, value: Int) => value + 1) ... if (numCheckFailures <= maxFailureNumTasksCheck) { messageScheduler.schedule( new Runnable { override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func, partitions, callSite, listener, properties)) }, timeIntervalNumTasksCheck, TimeUnit.SECONDS ) return } else { // Job failed, clear internal data. barrierJobIdToNumTasksCheckFailures.remove(jobId) listener.jobFailed(e) return } case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } // Job submitted, clear internal data. barrierJobIdToNumTasksCheckFailures.remove(jobId) ...下面开始创建 Job。ActiveJob表示在DAGScheduler里面运行的一个 Job。
Job 只负责向“叶子”Stage 要结果,而之前 Stage 的运行是由DAGScheduler来调度的。这是因为若干 Job 可能共用同一个 Stage 的计算结果,我这样说的根据是在 Stage 类的定义中的jobIds字段是一个HashSet,也就是说它可以属于多个 Job。所以将某个 Stage 强行归属到某个 Job 是不符合 Spark 设计逻辑的。
// DAGScheduler.scala ... val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() // 在这里会打印四条日志,这个可以被用来在Spark.log里面定位事件 logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) ... val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) // 从最后一个stage开始调用submitStage submitStage(finalStage)}Stage 阶段
Stage 是如何划分的呢?又是如何计算 Stage 之间的依赖的?我们继续查看submitStage这个函数,对于一个 Stage,首先调用getMissingParentStages看看它的父 Stage 能不能直接用,也就是说这个 Stage 的 rdd 所依赖的所有父 RDD 能不能直接用,如果不行的话,就要先算父 Stage 的。在前面的论述里,我们知道,若干 Job 可能共用同一个 Stage 的计算结果,而不同的 Stage 也可能依赖同一个 RDD。
private def submitStage(stage: Stage) { // 找到这个stage所属的job val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { // 如果依赖之前的Stage,先列出来,并且按照id排序 val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { // 运行这个Stage logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { // 先提交所有的parent stage for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) }}下面具体查看getMissingParentStages这个函数,可以看到,Stage 的计算链是以最后一个 RDD 为树根逆着向上遍历得到的,而这个链条的终点要么是一个ShuffleDependency,要么是一个所有分区都被缓存了的 RDD。
private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] val waitingForVisit = new ListBuffer[RDD[_]] // 这里是个**DFS**,栈是手动维护的,主要是为了防止爆栈 waitingForVisit += stage.rdd def visit(rdd: RDD[_]): Unit = { if (!visited(rdd)) { visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) { // 如果这个RDD有没有被缓存的Partition,那么它就需要被计算 for (dep <- rdd.dependencies) { // 我们检查这个RDD的所有依赖 dep match { case shufDep: ShuffleDependency[_, _, _] => // 我们发现一个宽依赖,因此我们创建一个新的Shuffle Stage,并加入到missing中(如果不存在) // 由于是宽依赖,所以我们不需要向上找了 val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage } case narrowDep: NarrowDependency[_] => // 如果是一个窄依赖,就加入到waitingForVisit中 // prepend是在头部加,+=是在尾部加 waitingForVisit.prepend(narrowDep.rdd) } } } } } while (waitingForVisit.nonEmpty) { visit(waitingForVisit.remove(0)) } missing.toList}Task 阶段
下面是重头戏submitMissingTasks,这个方法负责生成 TaskSet,并且将它提交给 TaskScheduler 低层调度器。
partitionsToCompute计算有哪些分区是待计算的。根据 Stage 类型的不同,findMissingPartitions的计算方法也不同。
// DAGScheduler.scalaprivate def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // First figure out the indexes of partition ids to compute. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() ...// ResultStage.scalaoverride def findMissingPartitions(): Seq[Int] = { val job = activeJob.get (0 until job.numPartitions).filter(id => !job.finished(id))}// ActiveJob.scalaval numPartitions = finalStage match { // 对于ResultStage,不一定得到当前rdd的所有分区,例如first()和lookup()的Action, // 因此这里是r.partitions而不是r.rdd.partitions case r: ResultStage => r.partitions.length case m: ShuffleMapStage => m.rdd.partitions.length}// ShuffleMapStage.scalaoverride def findMissingPartitions(): Seq[Int] = { mapOutputTrackerMaster .findMissingPartitions(shuffleDep.shuffleId) .getOrElse(0 until numPartitions)}// MapOutputTrackerMaster.scaladef findMissingPartitions(shuffleId: Int): Option[Seq[Int]] = { shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())}这个outputCommitCoordinator是由SparkEnv维护的OutputCommitCoordinator对象,它决定到底谁有权利向输出写数据。在 Executor 上的请求会通过他持有的 Driver 的OutputCommitCoordinatorEndpoint的引用发送给 Driver 处理
// DAGScheduler.scala ... // Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage val properties = jobIdToActiveJob(jobId).properties runningStages += stage // 在检测Tasks是否serializable之前,就要SparkListenerStageSubmitted, // 如果不能serializable,那就在这**之后**给一个SparkListenerStageCompleted stage match { case s: ShuffleMapStage => outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) case s: ResultStage => outputCommitCoordinator.stageStart( stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) } ...用getPreferredLocs计算每个分区的最佳计算位置,它实际上是调用getPreferredLocsInternal这个函数。这个函数是一个关于visit: HashSet[(RDD[_], Int)]的递归函数,visit 用(rdd, partition)元组唯一描述一个分区。getPreferredLocs的计算逻辑是这样的:
- 如果已经 visit 过了,就返回 Nil
- 如果是被 cached 的,通过
getCacheLocs返回 cache 的位置 - 如果 RDD 有自己的偏好位置,例如输入 RDD,那么使用
rdd.preferredLocations返回它的偏好位置 - 如果还没返回,但 RDD 有窄依赖,那么遍历它的所有依赖项,返回第一个具有位置偏好的依赖项的值
理论上,一个最优的位置选取应该尽可能靠近数据源以减少网络传输,但目前版本的 Spark 还没有实现
// DAGScheduler.scala ... val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => // 如果有非致命异常就创建一个新的Attempt,并且abortStage(这还不致命么) stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } ...下面,我们开始 attempt 这个 Stage,我们需要将 RDD 对象和依赖通过closureSerializer序列化成taskBinaryBytes,然后广播得到taskBinary。当广播变量过大时,会产生一条Broadcasting large task binary with size的 INFO。
// DAGScheduler.scala ... stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) // 如果没有Task要执行,实际上就是skip了,那么就没有Submission Time这个字段 if (partitionsToCompute.nonEmpty) { stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: 也许可以将`taskBinary`放到Stage里面以避免对它序列化多次。 // 一堆注释看不懂 var taskBinary: Broadcast[Array[Byte]] = null var partitions: Array[Partition] = null try { var taskBinaryBytes: Array[Byte] = null // taskBinaryBytes and partitions are both effected by the checkpoint status. We need // this synchronization in case another concurrent job is checkpointing this RDD, so we get a // consistent view of both variables. RDDCheckpointData.synchronized { taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => // 注意这里的stage.func已经被ClosureCleaner清理过了 JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } partitions = stage.rdd.partitions } ... // 广播 taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString, Some(e)) runningStages -= stage ... }下面,我们根据 Stage 的类型生成 Task。
// DAGScheduler.scala ... val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } } } catch { ... }我们将生成的tasks包装成一个TaskSet,并且提交给taskScheduler。
// DAGScheduler.scala ... if (tasks.nonEmpty) { logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) } else {如果 tasks 是空的,说明任务就已经完成了,打上 DEBUG 日志,并且调用submitWaitingChildStages
// Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) stage match { case stage: ShuffleMapStage => logDebug(s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})") markMapStageJobsAsFinished(stage) case stage : ResultStage => logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})") } submitWaitingChildStages(stage) }}
推荐阅读:
更多腾讯AI相关技术干货,请关注专栏腾讯技术工程