reduceByKey groupByKeycountByKey使用及区别总结

    技术2023-06-02  80

    三者都是对(k,v)类型的RDD进行聚合操作,reduceByKey groupByKey 是transform算子,countByKey是action算子。三者具体的聚合方式和使用场景不同

    1.reduceByKey

    在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。在shuffle之前,会在分区内部进行预聚合操作,降低开销。需求:创建一个pairRDD,计算相同key对应值的相加结果 (1)创建一个pairRDD scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24

    (2)计算相同key对应值的相加结果

    scala> val reduce = rdd.reduceByKey((x,y) => x+y) reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26

    (3)打印结果

    scala> reduce.collect() res29: Array[(String, Int)] = Array((female,6), (male,7))

    2.groupByKey

    groupByKey也是对每个key进行操作,但只生成一个Iterable类型的Seq。需求:创建一个pairRDD,将相同key对应值聚合到一个seq中,并计算相同key对应值的相加结果。 (1)创建一个pairRDD scala> val words = Array("one", "two", "two", "three", "three", "three") words: Array[String] = Array(one, two, two, three, three, three) scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26

    (2)将相同key对应值聚合到一个Seq中

    scala> val group = wordPairsRDD.groupByKey() group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28

    (3)打印结果

    scala> group.collect() res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))

    (4)计算相同key对应值的相加结果

    scala> group.map(t => (t._1, t._2.sum)) res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31

    (5)打印结果

    scala> res2.collect() res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))

    3.countByKey 1.countByKey是对每个key计算其所有的value的个数,reduceByKey的使用情景只能是value可以进行数值计算的RDD。而countByKey作用于value不可以进行数值计算的RDD,不关心value的内部情况,只计算value的个数。 返回collection.Map

    val scoreList = Array( Tuple2("class1", 90), Tuple2("class1", 60), Tuple2("class2", 60), Tuple2("class2", 50) ) val scoreRdd = sc.parallelize(scoreList) scala> val res = scoreRdd.countByKey res: scala.collection.Map[String,Long] = Map(class2 -> 2, class1 -> 2)//class2和class1均有2个

    总结:

    reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合:分区内聚合)操作,返回结果是RDD[k,v]。groupByKey:按照key进行分组,直接进行shuffle。开发指导:reduceByKey更建议使用。但是需要注意是否会影响业务逻辑。countByKey适用于不关心value具体内容,只需知道相同key有多少个value的情况。
    Processed: 0.009, SQL: 9