前面我们已经介绍了SparkSubmit提交任务后,Worker的调度以及Executor的注册过程。今天我们将介绍Spark是如何将我们程序划分成一个个job并且提交到对应的Executor执行的。我们知道RDD分为两种,transformation和action。只有当执行action时才会真正提交job进行计算。并且还会根据RDD之间的依赖关系(宽依赖、窄依赖)进行stage的划分,将stage中的一个个task提交到对应的Executor上执行。
我们以一个闭着眼睛都能写得出的WordCount为例,分析job提交过程
val conf = new SparkConf().setAppName("WordCount") val sc = new SparkContext() //构建RDD并调用他的Transformation val wordAndCount:RDD[(String,Int)] = sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //调用RDD的Action开始真正提交任务 wordAndCount.saveAsTextFile((args(1))) //释放资源 sc.stop()context就是SparkContext;writeToFile是一个要执行的函数
调用重载方法,开始切分stage。传说中的DAGScheduler出现了,用于切分成stage,然后转换成TaskSet给TaskScheduler再提交给Executor
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], allowLocal: Boolean, //saveAsTextFile无需返回值,所以这里返回值为空 resultHandler: (Int, U) => Unit) { if (stopped) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) //TODO 是否打印血统关系 if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } //TODO 重要:传说中的DAGScheduler出现了,用于切分成stage,然后转换成TaskSet给TaskScheduler再提交给Executor dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }调用submitJob方法并返回一个回调器
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, allowLocal: Boolean, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime //TODO 调用submitJob方法并返回一个回调器 val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties) waiter.awaitResult() match { case JobSucceeded => { logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) } case JobFailed(exception: Exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) throw exception } }封装到JobSubmitted中,并放入阻塞队列中
def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, allowLocal: Boolean, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) //TODO 先将数据封装到事件中然后放入到eventProcessLoop的阻塞队列中 eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)) waiter }通过模式匹配判断事件的类型,调用handleJobSubmitted()方法
//TODO 通过模式匹配判断事件的类型 override def onReceive(event: DAGSchedulerEvent): Unit = event match { //TODO 提交计算任务 case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => //TODO 调用dagScheduler的handleJobSubmitted方法处理 dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId)重要方法:通过newStage方法创建finalStage,并获取所有的依赖关系。在最后提交finalStage
//TODO 用于切分stage private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], allowLocal: Boolean, callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: Stage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. //TODO 重要:该方法用于划分Stage,返回finalStage finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } if (finalStage != null) { val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format( job.jobId, callSite.shortForm, partitions.length, allowLocal)) logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val shouldRunLocally = localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1 val jobSubmissionTime = clock.getTimeMillis() if (shouldRunLocally) { // Compute very short actions like first() or take() with no parent stages locally. listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties)) runLocally(job) } else { //TODO 集群模式 jobIdToActiveJob(jobId) = job activeJobs += job finalStage.resultOfJob = Some(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //TODO 开始提交Stage submitStage(finalStage) } } submitWaitingStages() }创建finalStage
//TODO 用于创建Stage private def newStage( rdd: RDD[_], numTasks: Int, shuffleDep: Option[ShuffleDependency[_, _, _]], jobId: Int, callSite: CallSite) : Stage = { //TODO 获取他的父Stage val parentStages = getParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() //TODO 通过父stages等new一个新的stage val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }通过一个stack,遍历所有rdd获取ShuffleMapStage,添加到List[Stage]中并返回
//TODO 用户获取父Stage private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] //TODO 定义访问访问方法 def visit(r: RDD[_]) { if (!visited(r)) { visited += r // Kind of ugly: need to register RDDs with the cache here since // we can't do it in its constructor because # of partitions is unknown for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_, _, _] => //TODO 把宽依赖传进去,获得父Stage parents += getShuffleMapStage(shufDep, jobId) case _ => waitingForVisit.push(dep.rdd) } } } } waitingForVisit.push(rdd) while (!waitingForVisit.isEmpty) { visit(waitingForVisit.pop()) } parents.toList }根据finalStage递归从最前开始调用submitMissingTasks提交stage
//TODO 根据最后一个Stage,递归的提交Stage private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { //TODO 获取他的父Stage val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) //判断父Stage是否为空,为空就意味着他是第一个Stage if (missing == Nil) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") //TODO 开始提交最前面的Stage submitMissingTasks(stage, jobId.get) } else { //TODO 有父Stage,就递归提交 for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id) } } //TODO DAG提交Stage个TaskScheduler private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry //TODO将正在添加的分区集合里的信息清空 stage.pendingTasks.clear() // First figure out the indexes of partition ids to compute. // TODO 计算出等待计算的分区 val partitionsToCompute: Seq[Int] = { if (stage.isShuffleMap) { (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil) } else { val job = stage.resultOfJob.get (0 until job.numPartitions).filter(id => !job.finished(id)) } } ........... //TODO 序列化,并进行广播。 var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = if (stage.isShuffleMap) { closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array() } else { closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array() } taskBinary = sc.broadcast(taskBinaryBytes) } catch { ........... //TODO 创建Tasks val tasks: Seq[Task[_]] = if (stage.isShuffleMap) { partitionsToCompute.map { id => val locs = getPreferredLocs(stage.rdd, id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, taskBinary, part, locs) } } else { val job = stage.resultOfJob.get partitionsToCompute.map { id => val p: Int = job.partitions(id) val part = stage.rdd.partitions(p) val locs = getPreferredLocs(stage.rdd, p) new ResultTask(stage.id, taskBinary, part, locs, id) } } ........... if (tasks.size > 0) { logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") stage.pendingTasks ++= tasks logDebug("New pending tasks: " + stage.pendingTasks) //TODO 调用taskScheduler的submitTasks方法来提交TaskSet taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run //TODO 如果stage不存在任务标记,则表示stage已经调度完成 markStageAsFinished(stage, None) logDebug("Stage " + stage + " is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) }总体流程:调用action rdd->rdd的runJob方法->SparkContext的runJob方法->scheduler的runJob方法->scheduler的submitJob方法,返回一个回调器,用于监听任务是否完成->scheduler的submitJob中将数据封装成JobSubmitted事件,放入到eventProcessLoop的阻塞队列中->DAGSchedulerEventProcessLoop的父类EventLoop从队列中获取到任务调用onReceive方法进行模式匹配->DAGSchedulerEventProcessLoop的onReceive方法中匹配到JobSubmitted,调用handleJobSubmitted方法->handleJobSubmitted方法中调用newStage获取finalStage->newStage方法中调用getParentStages获取父stages->getParentStages中逆遍历rdd,遇到ShuffleDependency调用getShuffleMapStage获取stage放入parents(List[Stage])中->回到newStage()方法中new Stage(id,finalRDD,numTasks,parents…)返回finalStage->回到handleJobSubmitted方法中继续执行,判断若为集群模式调用submitStage(finalStage)->submitStage中调用getMissingParentStages(此方法跟getParentStages很相似,HashSet[Stage],HashSet[RDD[]],new Stack[RDD[]]),获取父stages赋值为missing并根据id升序排序,missing为空才进行提交,否则遍历missing递归调用submitStage,并将stage该放入waitingStages中;从最开始的stage开始调用submitMissingTasks()方法提交->submitMissingTasks()中根据stage获取将要计算的分区;序列化stage并进行广播;为每个分区创建task(根据stage的类型创建ShuffleMapTask或ResultTask),创建前会调用getPreferredLocs()获取最佳位置;若task集合数量大于0,则将task集合封装成TaskSet,调用TaskScheduler的submitTasks开始提交任务->回到handleJobSubmitted中最后调用submitWaitingStages提交stage的正在等待的child Stages
到这,我们DAGScheduler划分stage以及提交stage的过程就结束啦,到这我们把TaskSet提交给了TaskScheduler,那TaskScheduler又是怎么调度分配Task然后执行的呢?下一篇为您揭晓
最后送出一张干货
需要原图可以联系我
如果喜欢的这篇文章的话记得关注喔,公共号同名