Spark 任务划分,调度,执行
def
main(args
: Array
[String
]): Unit
= {
val conf
= new SparkConf().setMaster("local[3]").setAppName("Word Count")
val sc
= new SparkContext(conf
)
val lines
= sc
.textFile("F:\\MyLearning\\SparkStudy\\A_Spark_Start\\src\\main\\resources\\word.txt")
val word
: RDD
[String
] = lines
.flatMap(_
.split(" "))
val words
: RDD
[(String
, Int
)] = word
.map((_
, 1))
val unit
: RDD
[(String
, Int
)] = words
.reduceByKey(_
+ _
)
val result
: Array
[(String
, Int
)] = unit
.collect()
result
.foreach(println
)
}
上述代码,一个world count 程序,下面来看一下每个原子的源码:
flatMap 返回一个RDD
def flatMap
[U
: ClassTag
](f
: T
=> TraversableOnce
[U
]): RDD
[U
] = withScope
{
val cleanF
= sc
.clean(f
)
new MapPartitionsRDD[U
, T
](this, (context
, pid
, iter
) => iter
.flatMap(cleanF
))
}
map 返回一个RDD
def map
[U
: ClassTag
](f
: T
=> U
): RDD
[U
] = withScope
{
val cleanF
= sc
.clean(f
)
new MapPartitionsRDD[U
, T
](this, (context
, pid
, iter
) => iter
.map(cleanF
))
}
reducebykey 返回一个 ShuffledRDD
} else {
new ShuffledRDD[K
, V
, C
](self
, partitioner
)
.setSerializer(serializer
)
.setAggregator(aggregator
)
.setMapSideCombine(mapSideCombine
)
}
注意: map 等算子返回的RDD 其实是继承OneToOneDependency,也就是窄依赖
1. Stage 划分
def
collect(): Array
[T
] = withScope
{
val results
= sc
.runJob(this, (iter
: Iterator
[T
]) => iter
.toArray
)
Array
.concat(results
: _
*)
}
def runJob
[T
, U
: ClassTag
](rdd
: RDD
[T
], func
: Iterator
[T
] => U
): Array
[U
] = {
runJob(rdd
, func
, 0 until rdd
.partitions
.length
)
}
def runJob
[T
, U
: ClassTag
](
rdd
: RDD
[T
],
func
: Iterator
[T
] => U
,
partitions
: Seq
[Int
]): Array
[U
] = {
val cleanedFunc
= clean(func
)
runJob(rdd
, (ctx
: TaskContext
, it
: Iterator
[T
]) => cleanedFunc(it
), partitions
)
}
def runJob
[T
, U
: ClassTag
](
rdd
: RDD
[T
],
func
: (TaskContext
, Iterator
[T
]) => U
,
partitions
: Seq
[Int
],
resultHandler
: (Int
, U
) => Unit
): Unit
= {
if (stopped
.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite
= getCallSite
val cleanedFunc
= clean(func
)
logInfo("Starting job: " + callSite
.shortForm
)
if (conf
.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd
.toDebugString
)
}
dagScheduler
.runJob(rdd
, cleanedFunc
, partitions
, callSite
, resultHandler
, localProperties
.get
)
progressBar
.foreach(_
.finishAll())
rdd
.doCheckpoint()
}
def runJob
[T
, U
](
rdd
: RDD
[T
],
func
: (TaskContext
, Iterator
[T
]) => U
,
partitions
: Seq
[Int
],
callSite
: CallSite
,
resultHandler
: (Int
, U
) => Unit
,
properties
: Properties
): Unit
= {
val start
= System
.nanoTime
val waiter
= submitJob(rdd
, func
, partitions
, callSite
, resultHandler
, properties
)
ThreadUtils
.awaitReady(waiter
.completionFuture
, Duration
.Inf
)
waiter
.completionFuture
.value
.get match
{
case scala
.util
.Success(_
) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter
.jobId
, callSite
.shortForm
, (System
.nanoTime
- start
) / 1e9))
case scala
.util
.Failure(exception
) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter
.jobId
, callSite
.shortForm
, (System
.nanoTime
- start
) / 1e9))
val callerStackTrace
= Thread
.currentThread().getStackTrace
.tail
exception
.setStackTrace(exception
.getStackTrace
++ callerStackTrace
)
throw exception
}
}
eventProcessLoop
.post(JobSubmitted(
jobId
, rdd
, func2
, partitions
.toArray
, callSite
, waiter
,
SerializationUtils
.clone(properties
)))
private[spark
] val eventProcessLoop
= new DAGSchedulerEventProcessLoop(this)
private[scheduler
] class DAGSchedulerEventProcessLoop(dagScheduler
: DAGScheduler
)
extends EventLoop[DAGSchedulerEvent
]("dag-scheduler-event-loop") with Logging
{
private val eventQueue
: BlockingQueue
[E
] = new LinkedBlockingDeque[E
]()
eventProcessLoop
.post(JobSubmitted(
jobId
, rdd
, func2
, partitions
.toArray
, callSite
, waiter
,
SerializationUtils
.clone(properties
)))
override def
run(): Unit
= {
try {
while (!stopped
.get
) {
val event
= eventQueue
.take()
try {
onReceive(event
)
......
}
private def
doOnReceive(event
: DAGSchedulerEvent
): Unit
= event match
{
case JobSubmitted(jobId
, rdd
, func
, partitions
, callSite
, listener
, properties
) =>
dagScheduler
.handleJobSubmitted(jobId
, rdd
, func
, partitions
, callSite
, listener
, properties
)
}
finalStage
= createResultStage(finalRDD
, func
, partitions
, jobId
, callSite
)
val parents
= getOrCreateParentStages(rdd
, jobId
)
private def
getOrCreateParentStages(rdd
: RDD
[_
], firstJobId
: Int
): List
[Stage
] = {
getShuffleDependencies(rdd
).map
{ shuffleDep
=>
getOrCreateShuffleMapStage(shuffleDep
, firstJobId
)
}.toList
}
private def
getOrCreateShuffleMapStage(
shuffleDep
: ShuffleDependency
[_
, _
, _
],
firstJobId
: Int
): ShuffleMapStage
= {
shuffleIdToMapStage
.get(shuffleDep
.shuffleId
) match
{
case Some(stage
) =>
stage
case None
=>
getMissingAncestorShuffleDependencies(shuffleDep
.rdd
).foreach
{ dep
=>
if (!shuffleIdToMapStage
.contains(dep
.shuffleId
)) {
createShuffleMapStage(dep
, firstJobId
)
}
}
createShuffleMapStage(shuffleDep
, firstJobId
)
}
}
2. 创建job,提交task,task执行
finalStage
= createResultStage(finalRDD
, func
, partitions
, jobId
, callSite
)
val job
= new ActiveJob(jobId
, finalStage
, callSite
, listener
, properties
)
...
...
submitStage(finalStage
)
val missing
= getMissingParentStages(stage
).sortBy(_
.id
)
if (missing
.isEmpty
) {
submitMissingTasks(stage
, jobId
.get
)
} else {
for (parent
<- missing
) {
submitStage(parent
)
}
waitingStages
+= stage
}
case stage
: ShuffleMapStage
=>
stage
.pendingPartitions
.clear()
partitionsToCompute
.map
{ id
=>
val locs
= taskIdToLocations(id
)
val part
= partitions(id
)
stage
.pendingPartitions
+= id
new ShuffleMapTask(stage
.id
, stage
.latestInfo
.attemptNumber
,
taskBinary
, part
, locs
, properties
, serializedTaskMetrics
, Option(jobId
),
Option(sc
.applicationId
), sc
.applicationAttemptId
, stage
.rdd
.isBarrier())
}
case stage
: ResultStage
=>
partitionsToCompute
.map
{ id
=>
val p
: Int
= stage
.partitions(id
)
val part
= partitions(p
)
val locs
= taskIdToLocations(id
)
new ResultTask(stage
.id
, stage
.latestInfo
.attemptNumber
,
taskBinary
, part
, locs
, id
, properties
, serializedTaskMetrics
,
Option(jobId
), Option(sc
.applicationId
), sc
.applicationAttemptId
,
stage
.rdd
.isBarrier())
}
}
submitWaitingChildStages(stage
)
if (tasks
.size
> 0) {
taskScheduler
.submitTasks(new TaskSet(
tasks
.toArray
, stage
.id
, stage
.latestInfo
.attemptNumber
, jobId
, properties
))
}
val manager
= createTaskSetManager(taskSet
, maxTaskFailures
)
schedulableBuilder
.addTaskSetManager(manager
, manager
.taskSet
.properties
)
override def
addTaskSetManager(manager
: Schedulable
, properties
: Properties
) {
rootPool
.addSchedulable(manager
)
}
backend
.reviveOffers()
case ReviveOffers
=>
makeOffers()
if (!taskDescs
.isEmpty
) {
launchTasks(taskDescs
)
}
val serializedTask
= TaskDescription
.encode(task
)
executorData
.executorEndpoint
.send(LaunchTask(new SerializableBuffer(serializedTask
)))
case LaunchTask(data
) =>
if (executor
== null
) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc
= TaskDescription
.decode(data
.value
)
logInfo("Got assigned task " + taskDesc
.taskId
)
executor
.launchTask(this, taskDesc
)
}