对于以下文件

进行wordcount,并按照出现次数多少排序
代码如下:
/** * 排序的wordcount程序 * @author Administrator * */ public class SortWordCount { public static void main(String[] args) { SparkConf conf=new SparkConf().setAppName("SortWordCount").setMaster("local"); JavaSparkContext sparkContext=new JavaSparkContext(conf); sparkContext.textFile("D://Bigdata//18.spark//wc.txt") .flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return new Arrays.Iterator<>(s.split(" ")); } }).mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String,Integer>(s,1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }).mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception { return new Tuple2<Integer, String>(s._2,s._1); } }).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception { return new Tuple2<String, Integer>(s._2,s._1); } }).foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> s) throws Exception { System.out.println(s._1+":"+s._2); } }); } }
结果如下:

对于以下文件内容
1 5
2 4
3 6
1 3
2 1
要想得到以下结果

可以通过以下代码
import scala.math.Ordered;
import java.io.Serializable;
public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable {
    private static final long serialVersionUID = -2366006422945129991L;
    // 首先在自定义key里面,定义需要进行排序的列
    private int first;
    private int second;
    public SecondarySortKey(int first, int second) {
        this.first = first;
        this.second = second;
    }
    public int getFirst() {
        return first;
    }
    public void setFirst(int first) {
        this.first = first;
    }
    public int getSecond() {
        return second;
    }
    public void setSecond(int second) {
        this.second = second;
    }
    @Override
    public int compare(SecondarySortKey that) {
        return 0;
    }
    @Override
    public boolean $less(SecondarySortKey that) {
        if (this.first<that.getFirst()){
            return  true;
        }else if (this.first==that.getFirst()){
            return  true;
        }
        return false;
    }
    @Override
    public boolean $greater(SecondarySortKey other) {
       if (this.first>other.getFirst()){
           return true;
       }else if (this.first==other.getFirst()&&this.second>other.getSecond()){
           return true;
       }
       return  false;
    }
    @Override
    public boolean $less$eq(SecondarySortKey that) {
        if(this.$less(that)) {
            return true;
        } else if(this.first == that.getFirst() &&
                this.second == that.getSecond()) {
            return true;
        }
        return false;
}
    @Override
    public boolean $greater$eq(SecondarySortKey that) {
        if (this.$greater(that)){
            return  true;
        }else  if (this.first==that.getFirst()&&this.second== that.getSecond()){
            return true;
        }
        return  false;
    }
    @Override
    public int compareTo(SecondarySortKey that) {
        if (this.first-that.getFirst()!=0){
            return  this.first-that.getFirst();
        }else {
            return  this.second-that.getSecond();
        }
    }
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 二次排序 * 1、实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法 * 2、将包含文本的RDD,映射成key为自定义key,value为文本的JavaPairRDD * 3、使用sortByKey算子按照自定义的key进行排序 * 4、再次映射,剔除自定义的key,只保留文本行 * @author Administrator * */ public class SecondarySort { public static void main(String[] args) { SparkConf conf=new SparkConf().setAppName("SortWordCount").setMaster("local"); JavaSparkContext sparkContext=new JavaSparkContext(conf); JavaRDD<String> lines=sparkContext.textFile("D:\\BaiduYunDownload\\Spark课件\\第39讲-Spark核心编程:高级编程之二次排序\\文档\\sort.txt"); JavaPairRDD<SecondarySortKey,String> pairRDD=lines.mapToPair(new PairFunction<String, SecondarySortKey, String>() { @Override public Tuple2<SecondarySortKey, String> call(String s) throws Exception { String[] strings=s.split(" "); SecondarySortKey secondarySortKey=new SecondarySortKey(Integer.valueOf(strings[0]),Integer.valueOf(strings[1])); return new Tuple2<>(secondarySortKey,s); } }); JavaPairRDD<SecondarySortKey,String> sortPairRDD=pairRDD.sortByKey(false); JavaRDD<String> resultRDD=sortPairRDD.map(new Function<Tuple2<SecondarySortKey, String>, String>() { @Override public String call(Tuple2<SecondarySortKey, String> s) throws Exception { return s._2; } }); resultRDD.foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); } }
1、对文本文件内的数字,取最大的前3个。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Iterator;
import java.util.List;
public class Top3 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Top3")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("D:\\BaiduYunDownload\\top.txt");
        JavaPairRDD<Integer,String> pairs=lines.mapToPair(new PairFunction<String, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(String s) throws Exception {
                return new Tuple2<Integer, String>(Integer.valueOf(s),s);
            }
        });
        JavaPairRDD<Integer,String> sortPairs=pairs.sortByKey(false);
        JavaRDD<String> sortList=sortPairs.map(new Function<Tuple2<Integer, String>, String>() {
            @Override
            public String call(Tuple2<Integer, String> s) throws Exception {
                return s._2;
            }
        });
        List<String> list=sortList.top(3);
        Iterator<String> it=list.iterator();
        while (it.hasNext()){
            System.out.println(it.next());
        }
    }
}
2、对每个班级内的学生成绩,取出前3名。(分组取topn)
import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
 * 分组取top3
 * @author Administrator
 *
 */
public class GroupTop3 {
	
	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("Top3")
				.setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		JavaRDD<String> lines = sc.textFile("D:\\BaiduYunDownload\\score.txt");
		
		JavaPairRDD<String, Integer> pairs = lines.mapToPair(
				
				new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
					@Override
					public Tuple2<String, Integer> call(String line) throws Exception {
						String[] lineSplited = line.split(" ");  
						return new Tuple2<String, Integer>(lineSplited[0], 
								Integer.valueOf(lineSplited[1]));
					}
					
				});
		
		JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey();
		
		JavaPairRDD<String, Iterable<Integer>> top3Score = groupedPairs.mapToPair(
				
				new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() {
private static final long serialVersionUID = 1L;
					@Override
					public Tuple2<String, Iterable<Integer>> call(
							Tuple2<String, Iterable<Integer>> classScores)
							throws Exception {
						Integer[] top3 = new Integer[3];
						
						String className = classScores._1;
						Iterator<Integer> scores = classScores._2.iterator();
						
						while(scores.hasNext()) {
							Integer score = scores.next();
							
							for(int i = 0; i < 3; i++) {
								if(top3[i] == null) {
									top3[i] = score;
									break;
								} else if(score > top3[i]) {
									for(int j = 2; j > i; j--) {
										top3[j] = top3[j - 1];  
									}
									
									top3[i] = score;
									
									break;
								} 
							}
						}
						
						return new Tuple2<String, 
								Iterable<Integer>>(className, Arrays.asList(top3));    
					}
					
				});
		
		top3Score.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
			
			private static final long serialVersionUID = 1L;
			@Override
			public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
				System.out.println("class: " + t._1);  
				Iterator<Integer> scoreIterator = t._2.iterator();
				while(scoreIterator.hasNext()) {
					Integer score = scoreIterator.next();
					System.out.println(score);  
				}
				System.out.println("=======================================");   
			}
			
		});
		
		sc.close();
	}
	
}
Spark RDD高级编程:基于排序机制的wordcount程序+二次排序+topn
原文:https://www.cnblogs.com/Transkai/p/11349465.html