原文链接:https://www.toutiao.com/i6845585556722680328/
在实际开发中经常需要对数据进行排序统计,Spark的sortBy以及SortByKEy算子并不能完全适用开发场景,需要我们自定义排序规则,例如如下数据:
Array("张三 16 98.3", "李四 14 98.3", "王五 34 100.0", "赵六 26 98.2", "田七 18 98.2")包含三个字段的学生数据,(姓名,年龄,成绩),我们需要按照成绩进行降序排序,成绩相同的按照年龄进行升序排序。
首先需要将数据转化成RDD格式的:
val conf: SparkConf = new SparkConf().setAppName("StudentSort").setMaster("local[2]") val context: SparkContext = new SparkContext(conf) val studentArr: Array[String] = Array("张三 16 98.3", "李四 14 98.3", "王五 34 100.0", "赵六 26 98.2", "田七 18 98.2") val linesRDD: RDD[String] = context.parallelize(studentArr)本文介绍六种实现自定义排序的方法,其中1-4是在自定义类的基础上进行的,5-6是利用用元祖的排序规则进行实现的。
第一种:
自定义一个Student类,该类继承Ordered和Serializable,并且重写排序方法。将RDD中的数据处理成Student格式的,然后调用sortBy排序时会按照自定义类的排序规则进行排序。
自定义Student类:
/** * 自定义类,重写排序方法,继承Ordered类,重写compare方法 * 还要继承Serializable类,序列化自定义类 * * 类的构造参数用val声明,会自动生成getter方法,否则无法直接使用this.score调用score * @param name * @param age * @param score */ class Student(val name:String, val age: Int, val score: Double) extends Ordered[Student] with Serializable { /** * 自定义排序标准: 按照score进行降序排序,若score一样比较age * @param that * @return */ override def compare(that: Student): Int = { if(this.score == that.score){ this.age - that.age } else if(this.score < that.score){ 1 } else { -1 } } override def toString: String = s"name: $name, age: $age, score: $score" }排序过程:
// 构造Student对象RDD val studentRDD: RDD[Student] = linesRDD.map(line => { val fields: Array[String] = line.split(" ") val name: String = fields(0) val age: Int = fields(1).toInt val score: Double = fields(2).toDouble new Student(name, age, score) // 数据为Student的对象 }) // 排序,排序规则是Student类中重写的compare方法 val sorted: RDD[Student] = studentRDD.sortBy(u => u) val collected: Array[Student] = sorted.collect()第二种:
同样自定义一个Student类,和第一种方法不同的是,该类只是用于说明排序规则,并且Student中只需保留排序用到的属性就可以,name可以不出现在Student中。
linesRDD.map处理得到的RDD中的数据类型是RDD[(String, Int, Double)]即元祖,而不是RDD[Student]。 这种方法在sortedBy中传入的是一个排序规则,不会改变数据的格式,只会改变顺序。
Student类:
/** * 自定义类,重写排序方法,继承Ordered类,重写compare方法 * 还要继承Serializable类,序列化自定义类 * *该类中是定义的是排序规则,与排序无关的属性可以不出现 * @param age * @param score */ class Student(val age: Int, val score: Double) extends Ordered[Student] with Serializable { /** * 自定义排序标准: 按照score进行降序排序,若score一样比较age * @param that * @return */ override def compare(that: Student): Int = { if(this.score == that.score){ this.age - that.age } else if(this.score < that.score){ 1 } else { -1 } } }排序过程:
// RDD中数据是元祖类型 val studentsRDD: RDD[(String, Int, Double)] = linesRDD.map(line => { val fields: Array[String] = line.split(" ") val name: String = fields(0) val age: Int = fields(1).toInt val score: Double = fields(2).toDouble (name, age, score) }) // 排序 (传入的是排序规则,并不改变数据格式,只改变排序顺序) // 这时候的Student可以不包含全部属性,仅包含需要排序的属性就可以,这里的类只是用来说明排序的规则 // Student也必须要实现序列化 val sorted: RDD[(String, Int, Double)] = studentsRDD.sortBy(stu => { new Student(stu._2, stu._3) }) val collected: Array[(String, Int, Double)] = sorted.collect()第三种:
在方案二上进行改进。
将Student定义为一个样例类case class,这样Student就不必继承Serializable来实现序列化。并且在sortBy算子中,也需要每次都要new一个对象。
Student类:
/** * 自定义类为样例类,继承Ordered类,重写compare排序方法 *不再需要继承Serializable序列化类 * *同样只需保留与排序相关的属性 * @param age * @param score */ case class Student(age: Int, score: Double) extends Ordered[Student]{ /** * 自定义排序标准: 按照score进行降序排序,若score一样比较age * @param that * @return */ override def compare(that: Student): Int = { if(this.score == that.score){ this.age - that.age } else if(this.score < that.score){ 1 } else { -1 } } }排序过程:
// RDD中数据是元祖类型 val studentsRDD: RDD[(String, Int, Double)] = linesRDD.map(line => { val fields: Array[String] = line.split(" ") val name: String = fields(0) val age: Int = fields(1).toInt val score: Double = fields(2).toDouble (name, age, score) }) // 排序 (传入的是排序规则,并不改变数据格式,只改变排序顺序) // 这时候的Student可以不是全部属性,仅包含需要排序的属性就可以,这里的类只是用来说明排序的规则 // 因为Student是样例类,这里不需要new了 val sorted: RDD[(String, Int, Double)] = studentsRDD.sortBy(stu => { Student(stu._2, stu._3) }) val collected: Array[(String, Int, Double)] = sorted.collect()第四种:
以上三种方案,自定义Student类中只能重写一次compare方法,也就是只能有一个排序规则。
为了实现多种排序方法,我们定义一个隐式转换,隐式转换中定义多个排序规则,在排序时只要import相应的排序规则就可以了。
为了不再在sortBy算子进行new操作,这里的Student自定义类同样为样例类,但是只定义类中包含的属性,不需要再继承Ordered类,重写compare排序方法了。
/** * 样例类,仅定义类,排序规则在隐式转化中实现 * @param age * @param score */ case class Student(age:Int, score: Double)而排序规则由专门的Object来实现,并且可以在Object中定义多种规则:
import XXX /** * 排序规则类,隐式转换实现排序规则,可以有多种排序规则 */ object StudentSortRules { // 按照分数进行排序 implicit object OrderingStudentScore extends Ordering[Student]{ override def compare(x: Student, y: Student): Int = { if(x.score == y.score){ x.age - y.age } else if(x.score < y.score){ 1 } else { -1 } } } // 按照年龄进行排序 implicit object OrderingStudentAge extends Ordering[Student]{ override def compare(x: Student, y: Student): Int = { if(x.age == y.age){ if(x.score > y.score){ 1 }else{ -1 } } else { x.age - y.age } } } }在排序时,说明使用哪种排序规则即可:
// RDD中数据是元祖类型 val studentsRDD: RDD[(String, Int, Double)] = linesRDD.map(line => { val fields: Array[String] = line.split(" ") val name: String = fields(0) val age: Int = fields(1).toInt val score: Double = fields(2).toDouble (name, age, score) }) // 排序 (传入的是排序规则,并不改变数据格式,只改变排序顺序) // 使用隐式转化,引入排序的规则 import StudentSortRules.OrderingStudentScore val sorted: RDD[(String, Int, Double)] = studentsRDD.sortBy(stu => { Student(stu._2, stu._3) }) val collected: Array[(String, Int, Double)] = sorted.collect()以上四种是通过自定义类的方式实现的。对于一些简单的业务逻辑来说,我们可以使用元组的比较规则来进行排序。 元组的比较规则是:先比第一个元素,如果第一个元素相等,再比第二个。
第五种:可以在sortBy时改变元祖形态,生成新的元祖,利用新元祖的排序规则进行排序:
// RDD中数据是元祖类型 val studentsRDD: RDD[(String, Int, Double)] = linesRDD.map(line => { val fields: Array[String] = line.split(" ") val name: String = fields(0) val age: Int = fields(1).toInt val score: Double = fields(2).toDouble (name, age, score) }) // 将需要排序的字段放到新的元祖中,利用元祖的比较规则: // 先比元祖的第一个,负号表示降序;若第一个相等,再比第二个 val sorted: RDD[(String, Int, Double)] = studentsRDD.sortBy(stu => (-stu._3, stu._2)) val collected: Array[(String, Int, Double)] = sorted.collect()第六种:不改变元祖的形态,使用Ordering中的on方法改变元祖比较的规则:
// RDD中数据是元祖类型 val studentsRDD: RDD[(String, Int, Double)] = linesRDD.map(line => { val fields: Array[String] = line.split(" ") val name: String = fields(0) val age: Int = fields(1).toInt val score: Double = fields(2).toDouble (name, age, score) }) // 以隐式转化的方式 通过指定元祖的排序规则来进行排序 // Ordering[(Double, Int)] 元祖比较的样式格式 // [(String, Int, Double)]原始元祖样式格式 // (t =>(-t._3, t._2)) 元祖内数据的数据比较规则,先比较第一个数据,负号表示降序,再比较第二个数据 implicit val rules = Ordering[(Double, Int)].on[(String, Int, Double)](t =>(-t._3, t._2)) val sorted: RDD[(String, Int, Double)] = studentsRDD.sortBy(stu => stu) val collected: Array[(String, Int, Double)] = sorted.collect()总结:以上六种排序方法,第五种最简单,也基本能满足日常开发所需。
