Spark算子

    技术2022-07-10  176

    简介

    Spark 算子大致可以分为以下两类:

    Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。

    Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。

    Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。

    Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。

    TransformationActionsmap(f:T=>U) : RDD[T]=>RDD[U]count():RDD[T]=>Longfilter(f:T=>Bool) : RDD[T]=>RDD[T]collect():RDD[T]=>Seq[T]flatMap(f:T=>Seq[U]) : RDD[T]=>RDD[U]reduce(f:(T,T)=>T):RDD[T]=>Tsample(fraction:Float) : RDD[T]=>RDD[T]lookup(k:K):RDD[(K,V)]=>Seq[V]groupByKey() : RDD[(K,V)]=>RDD[(K,Seq[V])]save(path:String):Ouputs RDD to a storage system,e.g:HDFSreduceByKey(f:(V,V)=>V) : RDD[(K,V)]=>RDD[(K,V)]union() : (RDD[T],RDD[T])=>RDD[T]join():(RDD[K,V],RDD[K,W])=>RDD[(K,(V,W))]cogroup():(RDD[K,V],RDD[K,W])=>RDD[(K,(Seq[V],Seq[W]))]crossProduct(): (RDD[T],RDD[U])=>RDD[(T,U)]mapValues(f:V=>W) : RDD[(K,V)]=>RDD[(K,W)]sort(c:Comparator[K]) : RDD[(K,V)]=>RDD[(K,V)]partitionBy(p:Partitioner[K]):RDD[(K,V)]=>RDD[(K,V)]

    Transformations 转换算子

    Transformations类算子叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行

    filter

    过滤符合条件的记录数,true保留,false过滤掉。

    map

    将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。

    特点:输入一条,输出一条数据。

    flatMap

    先map后flat

    与map类似,每个输入项可以映射为0到多个输出项。

    sample 随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。

    reduceByKey

    将相同的Key根据相应的逻辑进行处理。

    sortByKey/sortBy 作用在K,V格式的RDD上,对key进行升序或者降序排序。

    Action行动算子

    Action类算子叫做行动算子,如foreach,collect,count等。Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行

    count

    返回数据集中的元素数。会在结果计算完成后回收到Driver端。

    take(n)

    返回一个包含数据集前n个元素的集合。

    first

    first=take(1),返回数据集中的第一个元素。

    foreach

    循环遍历数据集中的每个元素,运行相应的逻辑。

    collect

    将计算结果回收到Driver端。

    控制算子

    控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。

    cache和persist都是懒执行的。必须有一个action类算子触发执行。

    checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

    cache

    默认将RDD的数据持久化到内存中。cache是懒执行。

    注意:chche () = persist()=persist(StorageLevel.Memory_Only)

    persist:

    可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。

    持久化级别如下:

    cache和persist的注意事项:

    cache和persist都是懒执行,必须有一个action类算子触发执行。

    cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。

    cache和persist算子后不能立即紧跟action算子。

    错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。

    checkpoint

    checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。

    checkpoint 的执行原理:

    当RDD的job执行完毕后,会从finalRDD从后往前回溯。当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
    Processed: 0.009, SQL: 9