首页 > 编程语言 > 详细

Spark 自定义分区及区内二次排序demo

时间:2019-08-08 17:41:35      阅读:235      评论:0      收藏:0      [点我收藏+]
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.Partitioner
import org.apache.spark.HashPartitioner

object Demo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("app")
    val sc = new SparkContext(conf)
    val data = sc.textFile("F:\\test\\test\\ssort.txt")
    
    //先分区, 再区内排序
    data.map{x=>
      val arr = x.split(" ")
      (arr(0),arr(1).toInt)
    }.partitionBy(new MySparkPartition(2)).mapPartitions{x=>
      //此处的sortBy为scala中list集合的方法, 与Spark中RDD的sortBy方法不一样,注意区分!!!
      x.toList.sortBy{case(x,y)=>
        (x, -y)
      }.toIterator
    }.saveAsTextFile("F:\\test\\test\\output")
    
    //data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).top(3)(Ordering.by(_._2)).foreach(println)
    
   /*data.map{x=>
     (new SecondarySortKey(x.split(" ")(0), x.split(" ")(1).toInt))
   }.sortBy(x=>x, true).map{x=>(x.first, x.second)}.foreach(println)*/
    
    /*data.sortBy({x=>
      (new SecondarySortKey(x.split(" ")(0), x.split(" ")(1).toInt))
    }, true).foreach(println)*/
    
    /*val data1 = data.sortBy({x=>
      (new SecondarySortKey(x.split(" ")(0),x.split(" ")(1).toInt))  
    },true).map{x=>
      val arr = x.split(" ")
      (arr(0), arr(1))
    }.partitionBy(new MySparkPartition(2)).saveAsTextFile("F:\\test\\test\\output")*/
    
    /*val l1 = List[(String,Int)](("a",1),("b",2),("d",4),("c",3),("a",2))
    //l1.sortBy(x=>(x._1,x._2))(Ordering.Tuple2(Ordering.String,Ordering.Int.reverse))
    l1.sortBy{case(x,y) =>
      (x, -y)
    }
    .foreach(println)*/
  }
}

class MySparkPartition(numsPartitions:Int) extends Partitioner{
  def numPartitions:Int = numsPartitions
  
  override def getPartition(key:Any):Int={
    if(key == "aa"){
      return 1
    }else{
      return 0
    }
  }
}

class SecondarySortKey(val first:String, val second:Int) extends Ordered[SecondarySortKey] with Serializable{
  def compare(other:SecondarySortKey):Int={
    var comp = this.first.compareTo(other.first)
    if(comp == 0){
      other.second.compareTo(this.second)
    }else{
      comp
    }
  }
}

  

Spark 自定义分区及区内二次排序demo

原文:https://www.cnblogs.com/shuzhiwei/p/11322463.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!