Spark知识点整理

    技术2022-07-11  85

    Spark知识点整理

    版本:Spark-2.1.1

    Spark架构

    Spark架构主要包含如下角色:

    Driver: 主计算进程,Spark job的驱动器Executor: 执行器,Worker上的计算进程Cluster Master: 主节点,在standalone模式中为主节点,控制整个集群,监控Worker. 在Yarn模式中充当资源管理器(Resource Manager)Worker: 从节点,负责控制计算节点,启动Executor或Driver

    Driver

    Spark的驱动器是执行main方法的进程,负责创建SparkContext,创建RDD,以及进行RDD的转化操作和行动操作的执行。Driver具有以下职能:

    把用户程序转为Job跟踪Executor运行状况为执行器节点调度任务UI展示应用运行状况

    Executor

    Spark Executor是一个工作进程,负责在Spark作业中运行任务,任务间相互独立,Spark应用启动时,Executor节点同时启动,并且始终伴随着整个Spark应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark应用也可以继续执行,会将出错节点上的任务调度到其他Executor上继续执行。Executor具有以下职能:

    负责运行Spark Task, 并将结果返回给Driver进程通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储. RDD是直接存储在Executor进程内的,因此任务可以在运行时充分利用缓存加速运算

    Driver与Executor关系

    Executor: 接收任务并执行任务。RDD算子中的计算功能由Executor执行Driver: 创建Spark Context对象的应用程序。Spark程序除了计算RDD计算以外的逻辑由Driver执行,Executor代码中引用的Driver部分的对象必须是可序列化的,因为可能需要网络传输

    Spark运行流程

    运行模式

    Local模式

    本地模式,Master和Worker均为本机,可以断点调试

    Standalone模式

    使用Spark本身的资源管理和调度

    Yarn模式

    使用Yarn作为Spark的资源管理和调度器,又分为Yarn-client模式和Yarn-cluster模式两种,涉及ResourceManager/NodeManager/ApplicationMaster/Container等组件

    Yarn-client模式

    driver运行在client客户端client与Executor Container通信进行作业调度适用于调试,在客户端可以看到日志client与Yarn集群的连接断开或client关闭,任务就挂了

    Yarn-cluster模式

    driver运行在ApplicationMaster中ApplicationManager与Executor Container通信进行作业调度日志需要登录到Yarn集群的节点才能看到client关闭或断开,任务不受影响,继续运行适用于生产环境

    ResourceManager

    处理客户端请求监控NodeManager启动或监控ApplicationMaster资源的分配与调度

    NodeManager

    管理单个节点上的资源处理来自ResourceManager的命令处理来自ApplicationMaster的命令

    ApplicationMaster

    负责数据切分为应用程序申请资源并分配给内部的任务任务的监控与容错

    Container

    Yarn的资源抽象,封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络

    工作流程

    RDD

    什么是RDD

    RDD (Resilient Distributed Dataset,分布式弹性数据集),是Spark的基本数据结构,在代码中是一个抽象类,代表不可变、可分区、里面的元素可并行计算的集合。RDD源码的设计模式类似Java IO,使用了装饰器模式,是对各种数据操作的封装。

    RDD的属性

    一组分区(partition),即数据集的基本组成单位 protected def getPartitions: Array[Partition] 一个计算每个分区的函数 def compute(split: Partition, context: TaskContext): Iterator[T] RDD之间的依赖关系,也叫『血缘』 protected def getDependencies: Seq[Dependency[_]] = deps Partioner,RDD的分区函数 @transient val partitioner: Option[Partitioner] = None 一个列表,存储每个Partition的优先位置(preferred location) protected def getPreferredLocations(split: Partition): Seq[String] = Nil 理念:移动数据不如移动计算

    RDD的特点

    分区

    Spark的数据是分区的,分区使得并行计算成为可能只有K-V数据才有分区器Spark支持Hash分区器和Range分区器

    只读

    RDD不能修改,只能通过已有的RDD经过算子变换成新的RDD两类算子 Transformations: 构建RDD的血缘,不立即执行计算Actions: 触发RDD计算,将结果写入外部系统

    依赖

    窄依赖:上下游RDD之间的依赖是一一对应的宽依赖:一个下游RDD依赖多个上游RDD通过依赖关系可以将一个Spark任务描述为一个DAG(有向无环图)宽依赖是划分stage的依据通过rdd.dependencies查看依赖的类型(OneToOneDependency和SuffleDependency)通过rdd.toDebugString查看依赖血缘

    缓存

    可以将RDD缓存到内存,只有第一次根据RDD的血缘计算,之后的计算直接从内存中取缓存使用RDD的persist或cache方法进行缓存,persist默认会把数据以序列化的形式存储在JVM的堆空间中

    持久化

    对长时间的任务,将中间RDD持久化到checkpoint,加速容错恢复在SparkContext中设置检查点保存目录(sc.setCheckpointDir(dir),通常dir设置为HDFS路径),然后调用RDD的checkpoint方法checkpoint会打断血缘,checkpoint后的RDD的血缘会从checkpoint开始

    创建RDD

    从内存中创建RDD

    parallelize

    makeRDD,底层实现使用parallelize

    从外部存储创建RDD

    textFile

    Spark任务划分

    Application: 初始化一个SparkContext即生成一个Application

    Job: 一个Action算子即生成一个Job

    Stage: 根据RDD之间的依赖关系将Job划分成不同的Stage, 遇到一个宽依赖则划分一个新的Stage

    Task: 将Stage最后一个RDD的分区分发到不同的Executor,每个Executor上执行一个Task

    广播变量和累加器

    RDD: 分布式数据集广播变量: 分布式只读变量(需要注册) 使用广播变量可以减少网络传输和内存使用:从每个task存储一份变量到每个Executor存储一份变量 累加器: 分布式只写变量(多个task共同操作同一个变量)

    SparkSQL

    DataFrame

    since Spark 1.3

    类似传统数据库的二维表

    SparkSession是创建DataFrame和执行SQL的入口

    DataSet

    since Spark 1.6

    DataFrame的一个扩展,类似Hibernate对数据表的封装

    Spark Streaming

    Spark Streaming 架构

    Spark Streaming编程的核心对象是DStream,是对一个时间片(时间片长度可以设置)的数据的封装. DStream的常用方法与RDD类似

    有状态转化操作:保存了过去的时间片的状态, 例如updateStateByKey, mapWithStates

    Spark shuffle

    Spark的shuffle发生在宽依赖算子,是划分两个stage的依据。在Spark 1.6之前,Spark的shuffle主要采用HashShuffleWriter;从Spark 1.6之后,Spark的shuffle主要采用SortShuffleWriter,但是当输出分区数较少时会采用BypassMergeSortShuffleWriter.

    HashShuffleWriter

    基于Hash的shuffle在每个map task/executor会为每一个输出分区形成一个文件,然后reduce task去取对应的文件。这样做的缺点是shuffle过程中产生很多小文件读写,并且这些小文件还要进行网络传输,效率低,容易OOM,因此在Spark 1.6中放弃了基于Hash的shuffle作为主要shuffle方式的设计思路。

    SortShuffleWriter

    Spark 1.6之后主要的shuffle方式,非常类似Hadoop Mapreduce的shuffle. map task的计算结果写入一个内存数据结构(可以类比Mapreduce的环形缓冲区),内存数据结构满了就会发生溢写,map task执行完后把溢写的多个小文件合用MergeSort归并成一个index file和一个data file,reduce task根据index file去fetch自己对应的data.

    BypassMergeSortShuffleWriter

    在Spark 1.6之后,当不存在map side combine(即aggregator)且输出分区数量小于spark.shuffle.sort.bypassMergeThreshold指定的阈值(默认为200)时,使用BypassMergeSortShuffleWriter替代SortShuffleWriter. BypassMergeSortShuffleWriter的思路类似HashShuffleWriter,但是每个map task为所有reduce task创建各自的输出文件后,在map task执行完后会将各个reduce task对应的文件合并成一个文件,并建立索引文件。

    Processed: 0.014, SQL: 9