Spark是UC Berkeley AMP Lab开源的通用分布式并行计算框架,目前已成为Apache软件基金会的顶级开源项目。Spark支持多种编程语言,包括Java、Python、R和Scala,同时Spark也支持Hadoop的底层存储系统HDFS,但Spark不依赖Hadoop。
Spark基于Hadoop MapReduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点,并且具有更高的运算速度。Spark能够比Hadoop运算更快,主要原因是:Hadoop在一次MapReduce运算之后,会将数据的运算结果从内存写入到磁盘中,第二次 MapReduce运算时在从磁盘中读取数据,两次对磁盘的操作,增加了多余的IO消耗;而Spark则是将数据一直缓存在内存中,运算时直接从内存读取数据,只有在必要时,才将部分数据写入到磁盘中。除此之外,Spark使用最先进的DAG(Directed Acyclic Graph, 有向无环图)调度程序、查询优化器和物理执行引擎,在处理批量处理以及处理流数据时具有较高的性能。按照Spark官网的说法,Spark相对于Hadoop而言,Spark能够达到100倍以上的运行负载。
Spark除了Spark Core外,还有其它由多个组件组成,目前主要有四个组件:Spark SQL、Spark Streaming、MLlib、GraphX。这四个组件加上Spark Core组成了Spark的生态。通常,我们在编写一个Spark应用程序,需要用到Spark Core和其余4个组件中的至少一个。Spark的整体构架图如下图所示: Spark Core:是Spark的核心,主要负责任务调度等管理功能。Spark Core的实现依赖于RDDs(Resilient Distributed Datasets, 弹性分布式数据集)的程序抽象概念。 Spark SQL:是Spark处理结构化数据的模块,该模块旨在将熟悉的SQL数据库查询与更复杂的基于算法的分析相结合,Spark SQL支持开源Hive项目及其类似SQL的HiveQL查询语法。Spark SQL还支持JDBC和ODBC连接,能够直接连接现有的数据库。 Spark Streaming:这个模块主要是对流数据的处理,支持流数据的可伸缩和容错处理,可以与Flume(针对数据日志进行优化的一个系统)和Kafka(针对分布式消息传递进行优化的流处理平台)等已建立的数据源集成。Spark Streaming的实现,也使用RDD抽象的概念,使得在为流数据(如批量历史日志数据)编写应用程序时,能够更灵活,也更容易实现。 MLlib:主要用于机器学习领域,它实现了一系列常用的机器学习和统计算法,如分类、回归、聚类、主成分分析等算法。 GraphX:这个模块主要支持数据图的分析和计算,并支持图形处理的Pregel API版本。GraphX包含了许多被广泛理解的图形算法,如PageRank。
Spark有多种运行模式,由图2中,可以看到Spark支持本地运行模式(Local模式)、独立运行模式(Standalone模式)、Mesos、YARN(Yet Another Resource Negotiator)、Kubernetes模式等。 本地运行模式是Spark中最简单的一种模式,也可称作伪分布式模式。 独立运行模式为Spark自带的一种集群管理模式,Mesos及YARN两种模式也是比较常用的集群管理模式。相比较Mesos及YARN两种模式而言,独立运行模式是最简单,也最容易部署的一种集群运行模式。 Kubernetes是一个用于自动化部署、扩展和管理容器化应用程序的开源系统。 Spark底层还支持多种数据源,能够从其它文件系统读取数据,如HDFS、Amazon S3、Hypertable、HBase等。Spark对这些文件系统的支持,同时也丰富了整个Spark生态的运行环境。
在Spark中,有几个基本概念是需要先了解的,了解这些基本概念,对于后续在学习和使用Spark过程中,能更容易理解一些。 Application:基于Spark的用户程序,即由用户编写的调用Spark API的应用程序,它由集群上的一个驱动(Driver)程序和多个执行器(Executor)程序组成。其中应用程序的入口为用户所定义的main方法。 SparkContext:是Spark所有功能的主要入口点,它是用户逻辑与Spark集群主要的交互接口。通过SparkContext,可以连接到集群管理器(ClusterManager),能够直接与集群Master节点进行交互,并能够向Master节点申请计算资源,也能够将应用程序用到的JAR包或Python文件发送到多个执行器(Executor)节点上。 Cluster Manager:即集群管理器,它存在于Master进程中,主要用来对应用程序申请的资源进行管理。 Worker Node:任何能够在集群中能够运行Spark应用程序的节点。 Task:由SparkContext发送到Executor节点上执行的一个工作单元。 Driver:也即驱动器节点,它是一个运行Application中main()函数并创建SparkContext的进程。Driver节点也负责提交Job,并将Job转化为Task,在各个Executor进程间协调Task的调度。Driver节点可以不运行于集群节点机器上。 Executor:也即执行器节点,它是在一个在工作节点(WorkerNode)上为Application启动的进程,它能够运行Task并将数据保存在内存或磁盘存储中,也能够将结果数据返回给Driver。 根据以上术语的描述,通过下图可以大致看到Spark程序在运行时的内部协调过程:
即弹性分布式数据集(Resilient Distributed Datasets),是一种容错的、可以被并行操作的元素集合,它是Spark中最重要的一个概念,是Spark对所有数据处理的一种基本抽象。Spark中的计算过程可以简单抽象为对RDD的创建、转换和返回操作结果的过程: 对于Spark的RDD计算抽象过程描述如下: makeRDD:可以通过访问外部物理存储(如HDFS),通过调用SparkContext.textFile()方法来读取文件并创建一个RDD,也可以对输入数据集合通过调用SparkContext.parallelize()方法来创建一个RDD。RDD被创建后不可被改变,只可以对RDD执行Transformation及Action操作。 Transformation(转换):对已有的RDD中的数据执行计算进行转换,并产生新的RDD,在这个过程中有时会产生中间RDD。Spark对于Transformation采用惰性计算机制,即在Transformation过程并不会立即计算结果,而是在Action才会执行计算过程。如map、filter、groupByKey、cache等方法,只执行Transformation操作,而不计算结果。 Action(执行):对已有的RDD中的数据执行计算产生结果,将结果返回Driver程序或写入到外部物理存储(如HDFS)。如reduce、collect、count、saveAsTextFile等方法,会对RDD中的数据执行计算。
Spark中RDD的每一次Transformation都会生成一个新的RDD,这样RDD之间就会形成类似于流水线(Pipeline)一样的前后依赖关系,在Spark中,依赖关系被定义为两种类型,分别是窄依赖和宽依赖: 窄依赖(NarrowDependency):每个父RDD的一个分区最多被子RDD的一个分区所使用,即RDD之间是一对一的关系。窄依赖的情况下,如果下一个RDD执行时,某个分区执行失败(数据丢失),只需要重新执行父RDD的对应分区即可进行数恢复。例如map、filter、union等算子都会产生窄依赖。 宽依赖(WideDependency,或ShuffleDependency):是指一个父RDD的分区会被子RDD的多个分区所使用,即RDD之间是一对多的关系。当遇到宽依赖操作时,数据会产生Shuffle,所以也称之为ShuffleDependency。宽依赖情况下,如果下一个RDD执行时,某个分区执行失败(数据丢失),则需要将父RDD的所有分区全部重新执行才能进行数据恢复。例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖。 RDD依赖关系如下图所示:
partition(分区)是Spark中另一个重要的概念,它是RDD的最小单元,RDD是由分布在各个节点上的partition 组成的。partition的数量决定了task的数量,每个task对应着一个partition。 例如,使用Spark来读取本地文本文件内容,读取完后,这些内容将会被分成多个partition,这些partition就组成了一个RDD,同时这些partition可以分散到不同的机器上执行。RDD的partition描述如下图所示: partition的数量可以在创建RDD时指定,如果未指定RDD的partition大小,则在创建RDD时,Spark将使用默认值,默认值为spark.default.parallelism配置的参数。
Partition数量的影响: 如果partition数量太少,则直接影响是计算资源不能被充分利用。例如分配8个核,但partition数量为4,则将有一半的核没有利用到。 如果partition数量太多,计算资源能够充分利用,但会导致task数量过多,而task数量过多会影响执行效率,主要是task在序列化和网络传输过程带来较大的时间开销。 根据Spark RDD Programming Guide上的建议,集群节点的每个核分配2-4个partitions比较合理。以下内容为Spark RDD Programming Guide上的截图: Partition调整: Spark中主要有两种调整partition的方法:coalesce、repartition 参考pyspark中的函数定义:
def coalesce(self, numPartitions, shuffle=False): """ Return a new RDD that is reduced into `numPartitions` partitions. """ def repartition(self, numPartitions): """ Return a new RDD that has exactly numPartitions partitions. Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using `coalesce`, which can avoid performing a shuffle. """ return self.coalesce(numPartitions, shuffle=True)从函数接口可以看到,reparation是直接调用coalesce(numPartitions, shuffle=True),不同的是,reparation函数可以增加或减少partition数量,调用repartition函数时,还会产生shuffle操作。而coalesce函数可以控制是否shuffle,但当shuffle为False时,只能减小partition数,而无法增大。
前面提到,RDD支持两种类型的算子操作:Transformation和Action。Spark采用惰性机制,Transformation算子的代码不会被立即执行,只有当遇到第一个Action算子时,会生成一个Job,并执行前面的一系列Transformation操作。一个Job包含N个Transformation和1个Action。 而每个Job会分解成一系列可并行处理的Task,然后将Task分发到不同的Executor上运行,这也是Spark分布式执行的简要流程。
Spark在对Job中的所有操作划分Stage时,一般会按照倒序进行,依据RDD之间的依赖关系(宽依赖或窄依赖)进行划分。即从Action开始,当遇到窄依赖类型的操作时,则划分到同一个执行阶段;遇到宽依赖操作,则划分一个新的执行阶段,且新的阶段为之前阶段的Parent,之前的阶段称作Child Stage,然后依次类推递归执行。Child Stage需要等待所有的Parent Stage执行完之后才可以执行,这时Stage之间根据依赖关系构成了一个大粒度的DAG。 如下图所示,为一个复杂的DAG Stage划分示意图: 上图为一个Job,该Job生成的DAG划分成了3个Stage。上图的Stage划分过程是这样的:从最后的Action开始,从后往前推,当遇到操作为NarrowDependency时,则将该操作划分为同一个Stage,当遇到操作为ShuffleDependency时,则将该操作划分为新的一个Stage。
Task为一个Stage中的一个执行单元,也是Spark中的最小执行单元,一般来说,一个RDD有多少个Partition,就会有多少个Task,因为每一个Task 只是处理一个Partition上的数据。在一个Stage内,所有的RDD操作以串行的Pipeline方式,由一组并发的Task完成计算,这些Task的执行逻辑完全相同,只是作用于不同的Partition。每个Stage里面Task的数目由该Stage最后一个RDD的Partition 个数决定。 Spark中Task分为两种类型,ShuffleMapTask和ResultTask,位于最后一个Stage的Task为ResultTask,其他阶段的属于ShuffleMapTask。ShuffleMapTask和ResultTask分别类似于Hadoop中的Map和Reduce。