图解spark的任务构建和提交流程

    技术2022-07-11  69

    简述spark的任务运行流程

    先是在写spark代码的时候,背后做一些RDD的转换,写完后构建DAG,划分stage, 然后提交到资源管理器分配计算资源, 并在worker上执行。

    首先写spark代码时离不开对RDD的调用,那么:

    为什么需要RDD

    数据处理模型统一: RDD是1个数据结构, 能够获取数据的分区。 不区分流式还是批式,只理解为1个数学模型。

    依赖划分原则: RDD之间通过窄依赖(仅1个依赖)和宽依赖(多依赖)进行关联。 为什么要划分依赖? 依赖数量不同,决定是否能在1个stage和节点中执行。 同时也决定了容灾策略,是否需要保存所有父RDD

    数据处理效率: 1个RDD,同时可在多个节点并发执行。

    容错处理: RDD本身是不可变的数据集,这样可保证数据恢复

    wordCount代码的背后

    以wordCount代码为例

    textFile

    第一步是读文件数据。

    JavaSparkContext ctx = new JavaSparkContext(sparkConf); ctx.textFile(args[0],1)

    这一步会生成HadoopRDD 这里注意下, 里面有一个清理序列化的操作, 分布式传输数据时,序列化很重要,而序列化时有些成员是无法被序列化的,在java中的关键字是transien.

    MappedRDD是什么? 如下: 即我们的RDD,会被包装到一个单点依赖的对象里,并指明这是单点依赖。 并且textFile这个过程, 其实是生成了2个RDD, 1个是HadoopRDD,还有一个是读取数据转成字符串的mapRDD。 他们各自被塞进dependecy对象中,并通过依赖建立连接。

    注意,一般分布式计算设计DAG图时, 都是只有input指针(即用输入对象做连接), 利用input来确定DAG关系。 在spark里就是依赖dependency的概念

    第二步做FlatMap

    接着我们调用flatMap

    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>(){ @Override public Iterable<String> call(String s){ return Arrays.asList(SPACE.split(s)); })

    背后的RDD和依赖i情况变成如下:

    第三步mapToPair

    懒得敲了,这里省略java代码。 看一下dag。

    第四步聚合

    需要分组,然后进行合并相同的单词数量 此时dag如下:

    注意, 并不是shuffle这个RDD被包装进了shuffle依赖, 而是它的前置RDD被包进了shuffle依赖。即dependency确实是只包装依赖的, 你如果是属于某个shuffle过程的依赖,那么就会被包装成shuffleDepnecy。

    最后一步collect

    当写完后,执行collet,进行计算执行和提交。

    完整流程如下:

    Collect的时候发生了什么

    collect后, 会通过dagScheduler进行runJob, submitJob的时候会返回一个waiter,在client端主程序中就会进行等待。 即client端提交任务时其实是异步的,会返回一个waiter进行等待,

    看一下submitJob的时候发生了什么 前面看起来都是一些业务处理, 关键在handleJobSumitted的时候,会做一个newStage的操作,正好可以看一下spark里的stage是怎么确定和生成的。 父(依赖)stage列表是怎么获取的? 上图里的关键信息在于, 当遇到shuffle的时候,就会隔离出一个stage。

    可以看一下之前提到的RDD拼成的DAG图,如下:

    newStage后,有如下情况: 可以看到又给stage里有一个rdds的数组, 里面放了该stage的所有RDD, 并建立了依赖关系。 然后每个stage又通过parent去确定依赖关系。

    stage提交

    newStage之后,会进行stage的提交

    看一下submitStage的时候做了什么,注意此时是先从finalStage开始提交的。 这里可以看到, 虽然我们做了finalStage的提交, 但是会优先提交它所依赖的前置stage, 一直等待stage 完成了再真正提交自己这个stage。

    这里看一下 stage是怎么发送的 上图可以看到stae座位task时,也是区分shuffle类型和map类型。

    这句话重点: 有多少个未计算的分区,决定了有多少个task, 即stage中的分区和task一一对应

    完成的stage创建和提交流程图如下:

    Processed: 0.012, SQL: 9