RDD(Resulient Distributed Databases, 弹性分布式数据集)代表可并行操作元素的不可变分区集合。严格来讲,RDD的转换及DAG的构成并不属于调度系统的内容,但是RDD却是调度系统操作的主要对象,因此有必要对RDD进行详细的介绍。
RDD是一个容错的、并行的数据结构,可以控制将数据存储到磁盘或者内存,能够获取数据的分区。RDD提供了一组类似于Scala的操作,比如map、flatMap、filter、reduceByKey、join等,这些操作实际上是对RDD进行转换(transformation)。此外,RDD还提供了collect、foreach、count、reduce、countByKey等操作完成数据计算的动作(action)。这里的转换和动作是一种惰性机制。
通常数据处理模型包括迭代计算、关系查询、MapReduce、流式处理等。Hadoop采用MapReduce模型,storm采用流式处理模型,而Spark借助RDD实现了以上的所有模型。
一个RDD包含一个或者多个分区,每个分区实际是一个数据集合的片段。在构建DAG的过程中,会将RDD用依赖关系串联起来。每个RDD都有其依赖(除了最顶级的RDD的依赖是空列表),这些依赖被划分为宽依赖和窄依赖。窄依赖会被划分到一个stage中,这样他们就能以管道(pipeline)的方式进行迭代(流水线优化)宽依赖由于所依赖的分区Task不止一个,所以往往需要跨结点传输数据。从容错角度讲,他们恢复计算结果的方式不同。
RDD的计算过程允许在多个结点并发执行。如果数据量很大,可以适当增加分区数量,这种根据硬件条件对并发数量的控制,能更好地利用各种资源,也能有效提高Spark的数据处理效率。
传统关系型数据库往往采用日志的方式来容错,数据以来往往依赖于重新执行日志。Hadoop为了避免单机故障概率较高的问题,通常讲数据备份到其他机器来容错。RDD本身是一个不可变的(immutable)数据集,当某个Worker结点上的Task失败时,可以利用DAG重新调度计算这些失败的Task(执行已成功的Task可以从CheckPoint中读取,而不用重新计算)。在流式计算的场景中,Spark需要记录日志和CheckPoint,以便利用日志和CheckPoint对数据进行恢复。
本文只对RDD中与调度系统相关的API进行分析。
抽象类RDD定义了所有RDD的规范,我们从RDD的属性开始,逐步了解RDD的实现。
sc:即SparkContext。_sc由@transient修饰,所以此属性不会被序列化。deps:构造器参数之一,是Dependency的序列,用于存储当前RDD的依赖。RDD的子类在实现时不一定会传递此参数。由于deps由@transient修饰,所以此属性不会被序列化。 partitioner:当前RDD的分区计算器。partitioner由@transient修饰,所以此属性不会被序列化。 id:当前RDD的唯一身份标识。此属性通过调用SparkContext的nextRddId属性生成。 name:RDD的名称。name由@transient修饰,所以此属性不会被序列化。dependencies_:与deps相同,但是可以被序列化。partitions_:存储当前RDD的所有分区的数组。partitions_由@transient修饰,所以此属性不会被序列化。storageLevel:当前RDD的存储级别。creationSite:创建当前RDD的用户代码。creationSite由@transient修饰,所以此属性不会被序列化。scope:当前RDD的操作作用域。scope由@transient修饰,所以此属性不会被序列化。checkpointData:当前RDD的检查点数据。checkpointAllMarkedAncestors:是否对所有标记了需要保存检查点的祖先保存检查点。doCheckpointCalled:是否已经调用了doCheckpoint方法设置检查点。此属性可以阻止对RDD多次设置检查点。RDD采用了模板方法的模式设计,抽象类RDD中定义了模板方法及一些未实现的接口,这些接口讲需要RDD的各个子类分别实现。
compute:对RDD的分区进行计算。getPartitions:获取当前RDD的所有分区。getDependencies:获取当前RDD的所有依赖。getPreferredLocations:获取某一分区的偏好位置。RDD中除了定义了以上接口外,还实现了一些模板方法。
partitions方法用于获取RDD的分区数组。
final def partitions: Array[Partition] = { checkpointRDD.map(_.partitions).getOrElse { // 从CheckPoint中查找 if (partitions_ == null) { stateLock.synchronized { if (partitions_ == null) { partitions_ = getPartitions // 调用getPartitions方法获取 partitions_.zipWithIndex.foreach { case (partition, index) => require(partition.index == index, s"partitions($index).partition == ${partition.index}, but it should equal $index") } } } } partitions_ } }preferredLocations方法优先调用CheckPoint中保存的RDD的getPreferredLocations方法获取指定分区的偏好位置,当没有保存CheckPoint时调用自身的getPreferredLocations方法获取指定分区的偏好位置。
final def preferredLocations(split: Partition): Seq[String] = { // 优先调用CheckPoint中保存的RDD的getPreferredLocations方法获取指定分区的偏好位置 checkpointRDD.map(_.getPreferredLocations(split)).getOrElse { getPreferredLocations(split) // 调用自身的getPreferredLocations方法获取指定分区的偏好位置 } }dependencies方法用于获取当前RDD所有依赖序列。
final def dependencies: Seq[Dependency[_]] = { checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse { if (dependencies_ == null) { stateLock.synchronized { if (dependencies_ == null) { dependencies_ = getDependencies } } } dependencies_ } } 从CheckPoint中获取RDD并将这些RDD封装为OneToOneDependency列表。如果从CheckPoint中获取到RDD的依赖,则返回RDD的依赖,否则进入下一步。如果dependencies_等于null,那么调用子类实现的getDependencies方法获取当前RDD的依赖后赋予dependencies,最后返回dependencies_。除了以上的模板方法,RDD还实现了以下的一些方法。
1)context:返回_sc。2)getStorageLevel:返回当前RDD的StorageLevel。3)getNarrowAncestors:用于获取RDD的祖先依赖中属于窄依赖的RDD序列。DAG中的RDD之间存在着依赖关系。换言之,正是RDD之间的依赖关系构建了由RDD所组成的DAG。Spark使用Dependency来表示RDD之间的依赖关系。Dependency的定义如下(抽象类)。
@DeveloperApi abstract class Dependency[T] extends Serializable { def rdd: RDD[T] }抽象类Dependency只定义了一个名叫rdd的方法,此方法返回当前依赖的RDD。
Dependency分为NarrowDependency和ShuffleDependency两种依赖,下面对它们分别介绍。
如果RDD与上游RDD的分区是一对一的关系,那么RDD和其上游RDD之间的依赖关系属于窄依赖。NarrowDependency继承了Dependency,以表示窄依赖。NarrowDependency的定义如下:
@DeveloperApi abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { def getParents(partitionId: Int): Seq[Int] override def rdd: RDD[T] = _rdd }NarrowDependency定义了一个类型为RDD的构造器参数_rdd,NarrowDependency重写了Dependency的rdd方法,让其返回_rdd。NarrowDependency还定义了一个获取某一分区的所有父级别分区序列的getParents方法。NarrowDependency一共有两个子类,它们的实现见代码如下。
@DeveloperApi class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = List(partitionId) } @DeveloperApi class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = { if (partitionId >= outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) } else { Nil } } }根据代码清单,OneToOneDependency重写的getParents方法告诉我们,子RDD的分区与依赖的父RDD分区相同。OneToOneDependency可以用图7.2更形象地说明。
图7-2:OneToOneDependency的依赖示意图根据代码清单,RangeDependency重写了Dependency的getParents方法,其实现告诉我们RangeDependency的分区是一对一的, 且索引为partitionId的子RDD分区与索引为partitionId - outStart + inStart的父RDD分区相对应(outStart代表子RDD的分区范围起始值,inStart代表父RDD的分区范围起始值)。RangeDependency可以用图7-3更形象的说明。
图7-3:RangeDependency的依赖示意图RDD与上游RDD的分区如果不是一对一的关系,或者RDD的分区依赖于上游RDD的多个分区,那么这种依赖关系就叫做Shuffle依赖(ShuffleDependency)。ShuffleDependency的实现代码如下:
<pre class="wp-block-syntaxhighlighter-code">@DeveloperApi class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false, val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor) extends Dependency[Product2[K, V]] { if (mapSideCombine) { require(aggregator.isDefined, "Map-side combine without Aggregator specified!") } override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]] private[<a href="http://san1.gz01.bdysite.com/?tag=spark" title="查看与 spark 相关的文章" target="_blank">spark</a>] val keyClassName: String = reflect.classTag[K].runtimeClass.getName private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName // Note: It's possible that the combiner class tag is null, if the combineByKey // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag. private[spark] val combinerClassName: Option[String] = Option(reflect.classTag[C]).map(_.runtimeClass.getName) val shuffleId: Int = _rdd.context.newShuffleId() val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId) }</pre> 1)_rdd:泛型要求必须是Product2[K, V]及其子类的RDD。2)partitioner:分区计算器Partitioner。Partitioner将在下一小节详细介绍。3)serializer:SparkEnv中创建的serializer,即org.apache.spark.serializer.JavaSerializer。4)keyOrdering:按照K进行排序的scala.math.Ordering/的实现类。5)aggregator:对map任务的输出数据进行聚合的聚合器6)mapSideCombine:是否在map端进行合并,默认为false。7)keyClassName:Key的类名。8)valueClassName:Value的类名。9)combinerClassName:结合器C的类名。10)shuffleId:当前ShuffleDependency的身份标识。11)shuffleHandle:当前ShuffleDependency的处理器。此外,ShuffleDependency还重写了父类Dependency的rdd方法,其实现将_rdd转换为RDD[Product2[K, V]]后返回。ShuffleDependency在构造的过程中还将自己注册到SparkContext的ContextCleaner中。
RDD之间的依赖关系如果是Shuffle依赖,那么上游RDD该如何确定每个分区的输出将交由下游RDD的哪些分区呢?或者下游RDD的各个分区将具体依赖于上游RDD的哪些分区呢?Spark提供了分区计算器来解决这个问题。ShuffleDependency的partitioner属性的类型是Partitioner,抽象类Partitioner定义了分区计算器的接口规范,ShuffleDependency的分区取决于Partitioner的具体实现。Partitioner的定义如下(抽象类):
abstract class Partitioner extends Serializable { def numPartitions: Int // 用于获取分区数量 def getPartition(key: Any): Int // 将输入的key映射到下游RDD的从0到numPartitions-1这一范围中的某一个分区 }Partitioner有很多具体的实现类,它们的继承体系如图1所示。
图7-4:Patitioner的继承体系Spark除图7-4中列出的Partitioner子类,还有很多Partitioner的匿名实现类,这里就不一一介绍了。本书以HashPartitioner(哈希分区计算器)为例,详细介绍Partitioner的实现。之所以选择对HashPartitioner的实现进行分析,一方面是由于其实现简洁明了,读者更容易理解;另一方面通过介绍HashPartitioner已经足够达到本书的目的。
HashPartitioner的实现代码如下:
class HashPartitioner(partitions: Int) extends Partitioner { // 增加了一个名为partitions的构造器参数作为分区数 require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") def numPartitions: Int = partitions // 返回分区数partitions def getPartition(key: Any): Int = key match { // 计算出下游RDD的各个分区将具体处理哪些key case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) // 对key的hashCode和numPartitions进行取模运算,得到key对应的分区索引。 } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions }由于上游RDD所处理的key的hash值在取模后可能产生数据倾斜,所以HashPartitioner并不是一个均衡的分区计算器。
根据HashPartitioner的实现,我们知道ShuffleDependency中的分区依赖关系不再是一对一的,而是取决于key,并且当前RDD的某个分区将可能依赖于ShuffleDependency的RDD的任何一个分区。经过以上分析,ShuffleDependency采用HashPartitioner后的分区依赖可以用图7-5来表示。
图7-5:ShuffleDependency的依赖示意图RDDInfo用于描述RDD的信息,RDDInfo提供的信息如下:
id:RDD的id。name:RDD的名称。numPartitions:RDD的分区数量。storageLevel:RDD的存储级别(即StorageLevel)。parentIds:RDD的父亲RDD的id序列。这说明一个RDD会有零到多个父RDD。callSite:RDD的用户调用栈信息。scope:RDD的作用域范围。scope的类型为RDDOperationScope,每一个RDD都有一个RDDOperationScope。RDDOperationScope与Stage或Job之间并无特殊关系,一个RDDOperationScope可以存在于一个Stage内,也可以跨越多个Job。numCachedPartitions:缓存的分区数量。memSize:使用的内存大小。diskSize:使用的磁盘大小。externalBlockStoreSize:Block存储在外部的大小(指不同结点)。RDD还提供了以下的方法:
是否已经缓存。
def isCached: Boolean = (memSize + diskSize > 0) && numCachedPartitions > 0由于RDDInfo继承了Ordered,所以重写了compare方法用于排序。compare的代码如下:
override def compare(that: RDDInfo): Int = { this.id - that.id // 根据id的大小进行排序 }用于从RDD构建出相应的RDDInfo,其实现代码如下:
private[spark] object RDDInfo { def fromRdd(rdd: RDD[_]): RDDInfo = { val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd)) // 获取rddName属性 val parentIds = rdd.dependencies.map(_.rdd.id) // 获取当前RDD依赖的所有父RDD的身份标识作为RDDInfo的parentIds属性 new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, parentIds, rdd.creationSite.shortForm, rdd.scope) // 创建RDDInfo对象并返回 } }