package day05 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SortTest1 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SortTest1").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val content = Array("laoduan 29 88","laoyang 30 100","laozhang 24 70","laoli 34 88") // parallelize构造RDD对象 val contentRDD: RDD[String] = sc.parallelize(content) // 过滤 val mapRDD: RDD[(String, Int, Int)] = contentRDD.map(line => { val fileds: Array[String] = line.split(" ") val name: String = fileds(0) val age: Int = fileds(1).toInt val faceValue: Int = fileds(2).toInt (name, age, faceValue) }) // 排序 val sorted: RDD[(String, Int, Int)] = mapRDD.sortBy(tp=>new MethodForSort(tp._3,tp._2)) println(sorted.collect().toBuffer) sc.stop() } } // 自定义排序,sorted排序继承Ordered,需要序列化 class MethodForSort(val fv:Int, val age:Int) extends Ordered[MethodForSort] with Serializable{ override def compare(that: MethodForSort): Int = { // 如果fv相等,就比较年龄 if(this.fv == that.fv){ this.age - that.age }else{ -(this.fv - that.fv) } } }
package day05 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SortTest2 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SortTest2").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val content = Array("laoduan 29 88","laoyang 30 100","laozhang 24 70","laoli 34 88") // parallelize构造RDD对象 val contentRDD: RDD[String] = sc.parallelize(content) // 过滤 val mapRDD: RDD[MethodForSort2] = contentRDD.map(line => { val fileds: Array[String] = line.split(" ") val name: String = fileds(0) val age: Int = fileds(1).toInt val faceValue: Int = fileds(2).toInt // (name, age, faceValue) new MethodForSort2(name,age,faceValue) }) // 将RRD里面装的MethodForSort2类型的数据进行排序 val sorted: RDD[MethodForSort2] = mapRDD.sortBy(tp=>tp) println(sorted.collect().toBuffer) sc.stop() } } // 自定义排序,sorted排序继承Ordered,需要序列化 class MethodForSort2(val name:String,val age:Int, val fv:Int) extends Ordered[MethodForSort2] with Serializable{ override def compare(that: MethodForSort2): Int = { // 如果fv相等,就比较年龄 if(this.fv == that.fv){ this.age - that.age }else{ -(this.fv - that.fv) } } override def toString: String = s"name:$name,age:$age,faceValue:$fv" }
package day05 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SortTest3 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SortTest3").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val content = Array("laoduan 29 88","laoyang 30 100","laozhang 24 70","laoli 34 88") // parallelize构造RDD对象 val contentRDD: RDD[String] = sc.parallelize(content) // 过滤 val mapRDD: RDD[(String, Int, Int)] = contentRDD.map(line => { val fileds: Array[String] = line.split(" ") val name: String = fileds(0) val age: Int = fileds(1).toInt val faceValue: Int = fileds(2).toInt (name, age, faceValue) }) // 排序 val sorted: RDD[(String, Int, Int)] = mapRDD.sortBy(tp=>MethodForSort3(tp._3,tp._2)) println(sorted.collect().toBuffer) sc.stop() } } // 自定义排序,sorted排序继承Ordered,需要序列化 // case class 是多例模式 case class MethodForSort3(fv:Int, age:Int) extends Ordered[MethodForSort3] with Serializable{ override def compare(that: MethodForSort3): Int = { // 如果fv相等,就比较年龄 if(this.fv == that.fv){ this.age - that.age }else{ -(this.fv - that.fv) } } }
package day05 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SortTest4 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SortTest4").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val content = Array("laoduan 29 88","laoyang 30 100","laozhang 24 70","laoli 34 88") // parallelize构造RDD对象 val contentRDD: RDD[String] = sc.parallelize(content) // 过滤 val mapRDD: RDD[(String, Int, Int)] = contentRDD.map(line => { val fileds: Array[String] = line.split(" ") val name: String = fileds(0) val age: Int = fileds(1).toInt val faceValue: Int = fileds(2).toInt (name, age, faceValue) }) // 充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个 val sorted: RDD[(String, Int, Int)] = mapRDD.sortBy(tp=>(-tp._3,tp._2)) println(sorted.collect().toBuffer) sc.stop() } }
package day05 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object SortTest5 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("SortTest5").setMaster("local[4]") val sc: SparkContext = new SparkContext(conf) val content = Array("laoduan 29 88","laoyang 30 100","laozhang 24 70","laoli 34 88") // parallelize构造RDD对象 val contentRDD: RDD[String] = sc.parallelize(content) // 过滤 val mapRDD: RDD[(String, Int, Int)] = contentRDD.map(line => { val fileds: Array[String] = line.split(" ") val name: String = fileds(0) val age: Int = fileds(1).toInt val faceValue: Int = fileds(2).toInt (name, age, faceValue) }) //充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个 //Ordering[(Int, Int)]最终比较的规则格式 //on[(String, Int, Int)]未比较之前的数据格式 //(t =>(-t._3, t._2))怎样将规则转换成想要比较的格式 implicit val rules = Ordering[(Int,Int)].on[(String,Int,Int)](t=>(-t._3,t._2)) val sorted: RDD[(String, Int, Int)] = mapRDD.sortBy(tp=>tp) println(sorted.collect().toBuffer) sc.stop() } }
原文:https://www.cnblogs.com/crazyforever/p/9898676.html