Spark任务划分,调度,执行

    技术2024-07-19  72

    Spark 任务划分,调度,执行

    def main(args: Array[String]): Unit = { //使用IDEA开发工具完成WordCount //local 模式 //创建SparkConf对象 val conf = new SparkConf().setMaster("local[3]").setAppName("Word Count") val sc = new SparkContext(conf) //读取文件内容 val lines = sc.textFile("F:\\MyLearning\\SparkStudy\\A_Spark_Start\\src\\main\\resources\\word.txt") //扁平化操作 val word: RDD[String] = lines.flatMap(_.split(" ")) //将word做分组 val words: RDD[(String, Int)] = word.map((_, 1)) //聚合 val unit: RDD[(String, Int)] = words.reduceByKey(_ + _) //收集并且打印 val result: Array[(String, Int)] = unit.collect() result.foreach(println) }

    上述代码,一个world count 程序,下面来看一下每个原子的源码:

    flatMap 返回一个RDD def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) } map 返回一个RDD def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) } reducebykey 返回一个 ShuffledRDD } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) }

    注意: map 等算子返回的RDD 其实是继承OneToOneDependency,也就是窄依赖

    1. Stage 划分
    //org.apache.spark.rdd.RDD#collect def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } //org.apache.spark.SparkContext#runJob 方法重载 def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.length) } //org.apache.spark.SparkContext#runJob def runJob[T, U: ClassTag]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U] = { val cleanedFunc = clean(func) runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions) } //org.apache.spark.SparkContext#runJob def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } //有向无环图调度器 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() } //org.apache.spark.scheduler.DAGScheduler#runJob def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime //提交任务 val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) //一直在等待 waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception } } //org.apache.spark.scheduler.DAGScheduler#submitJob // eventProcessLoop 会一直添加JobSubmitted 对象 eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) //其实eventProcessLoop 是一个DAGSchedulerEventProcessLoop 对象 //我们暂时不知道DAGSchedulerEventProcessLoop是干嘛的,只能是跟进源码 private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) //org.apache.spark.scheduler.DAGSchedulerEventProcessLoop //可以发现,DAGSchedulerEventProcessLoop 继承 EventLoop,继续跟进源码 private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging { //org.apache.spark.util.EventLoop //可以发现存在一个 BlockingQueue,该对象属于 java.util.concurrent 包 // jdk1.6后juc 阻塞式队列,双端式队列 // BlockingQueue 阻塞式队列,当队列满了会阻塞,当队列为空也会阻塞 private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() //org.apache.spark.scheduler.DAGScheduler#submitJob //回到之前的源码发现,eventProcessLoop 就是往阻塞队列中发送对象 eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) //org.apache.spark.util.EventLoop // 既然走到了EventLoop,势必会执行run(),也就是执行 onReceive(event) override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { // 终端生命周期之一 onReceive(event) ...... } //因为EventLoop是一个抽象类,所以只能去其实现类找对应的方法 //org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#onReceive private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { //因为前面eventProcessLoop.post(JobSubmitted())正好对应上 case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => //所以下一步走 handleJobSubmitted() dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) } //org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted //创建最终的阶段 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) //org.apache.spark.scheduler.DAGScheduler#createResultStage //创建或者获取上一个阶段 val parents = getOrCreateParentStages(rdd, jobId) //org.apache.spark.scheduler.DAGScheduler#getOrCreateParentStages private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { //getShuffleDependencies()主要作用: //判断当前RDD的依赖关系是否存在shuffle dependency //如果存在shuffle dependency,将该RDD添加到此HashSet //如果不存在,去判断该RDD的上一个RDD是否存在shuffle dependency getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList } //org.apache.spark.scheduler.DAGScheduler#getOrCreateShuffleMapStage private def getOrCreateShuffleMapStage( shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int): ShuffleMapStage = { shuffleIdToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => // Create stages for all missing ancestor shuffle dependencies. getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => if (!shuffleIdToMapStage.contains(dep.shuffleId)) { // ShuffleMapStage createShuffleMapStage(dep, firstJobId) } } createShuffleMapStage(shuffleDep, firstJobId) } } //所以最终,一个简单的world count 程序是存在两个Stage //1. ResultStage //2. ShuffleMapStage // 由此可见,spark stage 的划分,是根据RDD 的转换中是否存在 Shuffle 操作来决定的 // 并且 Stage的划分是从 Action 算子开始,一步一步倒推过去,最终划分Stage
    2. 创建job,提交task,task执行
    //继续看源码 //org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted //最终,给ShuffleMapStage 放到了ResultStage,其实就是ResultStage包含给ShuffleMapStage finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) // 将finalStage 放入 ActiveJob,证明,job里面包含Stage // 所以有的时候说 一个job里面包含多个阶段,这句话是没问题的,但是真正意义上来说,一个job里面只包含了一个 //Stage,但是这一个Stage里面又包含多个Stage val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) ... ... //提交Stage submitStage(finalStage) //org.apache.spark.scheduler.DAGScheduler#submitStage // getMissingParentStages() 判断是否存在上一个Stage val missing = getMissingParentStages(stage).sortBy(_.id) //这里使用递归,一直取出上一级Stage,直到没有上一级Stage,执行submitMissingTasks() if (missing.isEmpty) { //这里提交的是shuffle Map 任务 submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } //org.apache.spark.scheduler.DAGScheduler#submitMissingTasks case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id //创建Task,Task 的个数根据分区个数决定 new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } } //提交下一个阶段的任务 // Spark 在划分Stage 的时候,是从后向前划分,但是在提交任务的时候,是从前向后划分 // 比如 : Stage collect --> reducebykey -->map -->flatmap // submitTask flatmap --> map --> reducebykey --> collect submitWaitingChildStages(stage) if (tasks.size > 0) { //提交任务 taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties)) } //org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks val manager = createTaskSetManager(taskSet, maxTaskFailures) schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) //org.apache.spark.scheduler.FIFOSchedulableBuilder#addTaskSetManager override def addTaskSetManager(manager: Schedulable, properties: Properties) { //先添加到任务池,当你机器没有资源的时候,任务可以在池里等待资源充足时在运行 rootPool.addSchedulable(manager) } //org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks backend.reviveOffers() //org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend case ReviveOffers => makeOffers() //org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers //从任务池获取任务,如果需要执行 if (!taskDescs.isEmpty) { launchTasks(taskDescs) } //org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#launchTasks // 由于Task 是需要从driver 发送到executor端执行,所以必须序列化 val serializedTask = TaskDescription.encode(task) //driver 与 executor 进行通信 executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) //回过头来看executor终端 //org.apache.spark.executor.CoarseGrainedExecutorBackend#receive case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { // Task 反序列化 val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) // 执行任务 executor.launchTask(this, taskDesc) }
    Processed: 0.016, SQL: 9