在DAGScheduler的handleJobSubmitted方法中,
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. // 如果作业运行在HadoopRDD上,而底层HDFS的文件已被删除,那么在创建新的Stage是将会抛出异常。 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) }
其中createResultStage方法创建ResultStage,在submitStage方法中提交finalStage。handleJobSubmitted方法中的ActiveJob是一个普通的数据结构,保存了当前的Job的一些信息:
private[spark] class ActiveJob( val jobId: Int, val finalStage: Stage, val callSite: CallSite, val listener: JobListener, val properties: Properties) { ? /** * Number of partitions we need to compute for this job. Note that result stages may not need * to compute all partitions in their target RDD, for actions like first() and lookup(). */ val numPartitions = finalStage match { case r: ResultStage => r.partitions.length case m: ShuffleMapStage => m.rdd.partitions.length } ? /** Which partitions of the stage have finished */ val finished = Array.fill[Boolean](numPartitions)(false) ? var numFinished = 0 }
getMissingParentStages方法根据finalStage找父Stage,如果有父Stage,就直接返回;如果没有父Stage,就进行创建。
submitStage源码, 位于org.apache.spark.scheduler.DAGScheduler#submitStage
private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
submitStage方法首先从activeJobForStage中获取JobID;如果JobID已经定义为isDefined,那么获取即将计算的Stage(getMissingParentStages),然后进行升序排序。
如果父Stage为空,那么提交submitMissingTasks,DAGScheduler把处理的过程交给具体的TaskScheduler去处理。
如果父Stage不为空,将循环递归调用submitStage(parent),从后向前回溯。后面的Stage依赖于前面的Stage。只有前面依赖的Stage计算完毕后,后面的Stage才会运行。
Stage
DAGScheduler会将Job的RDD划分到不同的Stage,并构建这些Stage的依赖关系。这样可以使得没有依赖关系的Stage并行执行,并保证有顺序依赖关系的Stage顺序执行。并行执行能够有效利用集群资源,提升运行效率,而串行执行则适用于在时间、资源上存在强制依赖的场景。
Stage分为需要处理的Shuffle的ShuffleMapStage和最下游的ResultStage。上游Stage优于下游Stage执行,ResultStage是最后执行的Stage。
Stage的属性如下:
- id:Stage的身份标识
- rdd:当前Stage包含的RDD
- numTasks:当前Stage的Task数量
- parents:当前Stage的父Stage列表。一个Stage可以有一到多个父Stage
- firstJobId:第一个提交当前Stage的Job的身份标识(即Job的Id)。当使用FIFO调度时,通过FIrstJobId首先计算来自比较早Job的Stage,或者在发生故障是更快的恢复。
- callSite:应用程序中与当前Stage相关联的调用栈信息。
- numPartitions:当前Stage的分区数量。numPartitions实际为rdd的分区的数量。
- jobIds:当前Stage所属的Job的身份标识集合。一个Stage可以属于一到多个Job。
- pendingPartitions:存储待处理分区的索引的集合。
- nextAttemptId:用于生成Stage下一次尝试的身份标识。
- _latestInfo:Stage最近一次尝试信息,即StageInfo。
- fetchFailedAttemptIds:发生FetchFailure的Stage尝试的身份标识集合。此属性用于避免在发生FetchFailure后无止境的重试。
- clearFailures:清空fetchFailedAttemptIds
- failedOnFetchAndShouldAbort:用于将发生FetchFailure的Stage尝试的身份标识添加到fetchFailedAttemptIds中,并返回发生FetchFailure的次数是否已经超过了允许发生 FetchFailure的次数的状态。允许发生FetchFailure的次数固定为4.
- latestInfo:返回最近一次Stage尝试的StageInfo,即返回_latestInfo
- findMissingPartitions:找到还未执行完成的分区。需要子类实现。
- makeNewStageAttempt:用于创建新的Stage尝试,
def makeNewStageAttempt( numPartitionsToCompute: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = { val metrics = new TaskMetrics metrics.register(rdd.sparkContext) _latestInfo = StageInfo.fromStage( this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences) nextAttemptId += 1 }
makeNewStageAttempt方法的执行步骤如下:
- 调用StageInfo的fromStage方法创建新的StageInfo
- 增加nextAttemptId
抽象类Stage有两个实现类,分别是ShuffleMapStage、ResultStage
ShuffleMapStage
ShuffleMapStage的DAG调度过程的中间Stage,可以包括一到多个ShuffleMapTask,这些ShuffleMapTask将生成用于SHuffle的数据。ShuffleMapStage一般是ResultStage或者其他ShuffleMapStage的父Stage,ShuffleMapTask则通过Shuffle与下游Stage中的Task串联起来。
从ShuffleMapStage的命名可以看出,它将Shuffle的数据映射到下游Stage的各个分区中。
ShuffleMapStage处理继承父类Stage的属性外,还包括一下属性:
- shuffleDep:与ShuffleMapStage相关联的ShuffleDependency
- _mapStageJobs:与ShuffleMapStage相对应的ActiveJob的列表。
- _numAvailableOutputs:ShuffleMapStage可用的map任务的输出数量。也代表了执行成功的map数量
- outputLocs:ShuffleMapStage的各个map与其对应的MapStatus列表的映射关系。由于map任务可能会运行多次,因而可能会有多个MapStatus。
ShuffleMapStage提供的方法:
- mapStageJobs方法:即获取_mapStageJobs数据
- addActiveJob方法与removeActiveJob方法:向ShuffleMapStage相关联的ActiveJob的列表中添加或删除ActiveJob。源码如下
/** Adds the job to the active job list. */ def addActiveJob(job: ActiveJob): Unit = { _mapStageJobs = job :: _mapStageJobs } ? /** Removes the job from the active job list. */ def removeActiveJob(job: ActiveJob): Unit = { _mapStageJobs = _mapStageJobs.filter(_ != job) }
- numAvailableOutputs方法:读取_numAvailableOutputs
- isAvailable方法:当_numAvailableOutputs与numPartitions相等时为true。ShuffleMapStage的所有分区的map任务执行成功后,ShuffleMapStage才是可用的。
- findMissingPartitions方法:找到所有还未执行成功而需要计算的分区。源码如下:
override def findMissingPartitions(): Seq[Int] = { val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty) assert(missing.size == numPartitions - _numAvailableOutputs, s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}") missing }
- addOutputLoc方法:当面某一个分区的任务执行完成后,首先将分区与MapStatus的对应关系添加到outputLocs中,然后将可用的输出数加1.源码如下:
def addOutputLoc(partition: Int, status: MapStatus): Unit = { val prevList = outputLocs(partition) outputLocs(partition) = status :: prevList if (prevList == Nil) { _numAvailableOutputs += 1 } }
ResultStage
ResultStage可以使用指定的函数对RDD中的分区进行计算并得出最终结果。ResultStage是最后执行的Stage。此阶段主要进行作业的收尾工作。ResultStage除了继承父类Stage的属性外,还包括一下属性:
- func:即对RDD的分区进行计算的函数。func是ResultStage的构造器参数,指定了函数的形式必须满足(TaskContext, Iterator[_]) => _
- partitions:由RDD的各个分区的索引组成的数组。
- _activeJob:ResultStage处理的ActiveJob
ResultStage提供的一些方法
def activeJob: Option[ActiveJob] = _activeJob ? def setActiveJob(job: ActiveJob): Unit = { _activeJob = Option(job) } ? def removeActiveJob(): Unit = { _activeJob = None } ? /** * Returns the sequence of partition ids that are missing (i.e. needs to be computed). * * This can only be called when there is an active job. */ override def findMissingPartitions(): Seq[Int] = { val job = activeJob.get (0 until job.numPartitions).filter(id => !job.finished(id)) }
其中findMissingPartitions方法用于找出当前Job的所有分区中还没有完成的分区的索引。ResultStage判断一个分区是否完成,是通过ActiveJob的Boolean类型数组finished,因为finished记录了每个分区是否完成。
StageInfo
StageInfo用于描述Stage信息,可以传递给SparkListener。StageInfo包括一下属性:
- stageId:Stage的id
- attemptId:当前Stage尝试的id。
- name:当前Stage的名称。
- numTasks:当前Stage的Task数量
- rddInfos:RDD信息(即RDDInfo)序列
- parentIds:当前Stage的父Stage的id序列
- details:详细的线程栈信息
- taskMetrics:Task的度量信息
- taskLocalityPreferences:类型为Seq[Seq[TaskLocation]],用于存储任务的本地性偏好
- submissionTime:DAGScheduler将当前Stage提交给TaskScheduler的时间
- completionTime:当前Stage中所有Task完成的时间(即Stage完成的时间)或者Stage被取消的时间。
- failureReason:如果Stage失败了,用于记录失败的原因。
- accumulables:存储所有聚合器计算的最终值。
StageInfo提供了一个当Stage失败是要调用的方法stageFailed,源码如下:
def stageFailed(reason: String) { failureReason = Some(reason) completionTime = Some(System.currentTimeMillis) }