RDD行动算子

    技术2024-01-14  107

    reduce

    函数说明:聚集RDD中所有元素,先聚合分区内数据,然后聚合分区间数据

    val reduceResult: Int = rdd.reduce(_+_)

    collect

    函数说明:以数组Array的形式返回数据集的所有元素

    rdd.collect().foreach(println)

    count

    返回RDD元素的个数

    val countResult: Long = rdd.count()

    first

    函数说明:返回RDD中的第一个元素

    take

    函数说明:返回一个由RDD的前n个元素组成的数组

    takeOrdered

    函数说明:返回该RDD排序后的前n个元素组成的数组

    aggregate

    函数说明:分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

    object Test { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("zk") val sparkContext = new SparkContext(sparkConf) val rdd: RDD[Int] = sparkContext.makeRDD(List(11, 2, 3, 4,6,1),2) val ans: Int = rdd.aggregate(0)(Math.max(_, _), _ + _) println(ans) } }

    fold

    函数说明:aggregate的简化操作

    val foldResult: Int = rdd.fold(0)(_+_)

    countByKey

    函数说明:统计每种key的个数

    object Test { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("zk") val sparkContext = new SparkContext(sparkConf) val rdd: RDD[(Int, String)] = sparkContext.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c"))) val ans: collection.Map[Int, Long] = rdd.countByKey() for ((a,b)<-ans){ println(a + ":" +b ) } sparkContext.stop() } }

    sava相关算子

    // 保存成Text文件 rdd.saveAsTextFile("output") // 序列化成对象保存到文件 rdd.saveAsObjectFile("output1") // 保存成Sequencefile文件 rdd.map((_,1)).saveAsSequenceFile("output2")

    foreach

    函数说明:分布式遍历RDD中的每一个元素,调用指定函数

    Processed: 0.021, SQL: 9