对于以下文件
进行wordcount,并按照出现次数多少排序
代码如下:
/** * 排序的wordcount程序 * @author Administrator * */ public class SortWordCount { public static void main(String[] args) { SparkConf conf=new SparkConf().setAppName("SortWordCount").setMaster("local"); JavaSparkContext sparkContext=new JavaSparkContext(conf); sparkContext.textFile("D://Bigdata//18.spark//wc.txt") .flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return new Arrays.Iterator<>(s.split(" ")); } }).mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String,Integer>(s,1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }).mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception { return new Tuple2<Integer, String>(s._2,s._1); } }).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception { return new Tuple2<String, Integer>(s._2,s._1); } }).foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> s) throws Exception { System.out.println(s._1+":"+s._2); } }); } }
结果如下:
对于以下文件内容
1 5
2 4
3 6
1 3
2 1
要想得到以下结果
可以通过以下代码
import scala.math.Ordered;
import java.io.Serializable;
public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable {
private static final long serialVersionUID = -2366006422945129991L;
// 首先在自定义key里面,定义需要进行排序的列
private int first;
private int second;
public SecondarySortKey(int first, int second) {
this.first = first;
this.second = second;
}
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
@Override
public int compare(SecondarySortKey that) {
return 0;
}
@Override
public boolean $less(SecondarySortKey that) {
if (this.first<that.getFirst()){
return true;
}else if (this.first==that.getFirst()){
return true;
}
return false;
}
@Override
public boolean $greater(SecondarySortKey other) {
if (this.first>other.getFirst()){
return true;
}else if (this.first==other.getFirst()&&this.second>other.getSecond()){
return true;
}
return false;
}
@Override
public boolean $less$eq(SecondarySortKey that) {
if(this.$less(that)) {
return true;
} else if(this.first == that.getFirst() &&
this.second == that.getSecond()) {
return true;
}
return false;
}
@Override
public boolean $greater$eq(SecondarySortKey that) {
if (this.$greater(that)){
return true;
}else if (this.first==that.getFirst()&&this.second== that.getSecond()){
return true;
}
return false;
}
@Override
public int compareTo(SecondarySortKey that) {
if (this.first-that.getFirst()!=0){
return this.first-that.getFirst();
}else {
return this.second-that.getSecond();
}
}
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 二次排序 * 1、实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法 * 2、将包含文本的RDD,映射成key为自定义key,value为文本的JavaPairRDD * 3、使用sortByKey算子按照自定义的key进行排序 * 4、再次映射,剔除自定义的key,只保留文本行 * @author Administrator * */ public class SecondarySort { public static void main(String[] args) { SparkConf conf=new SparkConf().setAppName("SortWordCount").setMaster("local"); JavaSparkContext sparkContext=new JavaSparkContext(conf); JavaRDD<String> lines=sparkContext.textFile("D:\\BaiduYunDownload\\Spark课件\\第39讲-Spark核心编程:高级编程之二次排序\\文档\\sort.txt"); JavaPairRDD<SecondarySortKey,String> pairRDD=lines.mapToPair(new PairFunction<String, SecondarySortKey, String>() { @Override public Tuple2<SecondarySortKey, String> call(String s) throws Exception { String[] strings=s.split(" "); SecondarySortKey secondarySortKey=new SecondarySortKey(Integer.valueOf(strings[0]),Integer.valueOf(strings[1])); return new Tuple2<>(secondarySortKey,s); } }); JavaPairRDD<SecondarySortKey,String> sortPairRDD=pairRDD.sortByKey(false); JavaRDD<String> resultRDD=sortPairRDD.map(new Function<Tuple2<SecondarySortKey, String>, String>() { @Override public String call(Tuple2<SecondarySortKey, String> s) throws Exception { return s._2; } }); resultRDD.foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } }); } }
1、对文本文件内的数字,取最大的前3个。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Iterator;
import java.util.List;
public class Top3 {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Top3")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("D:\\BaiduYunDownload\\top.txt");
JavaPairRDD<Integer,String> pairs=lines.mapToPair(new PairFunction<String, Integer, String>() {
@Override
public Tuple2<Integer, String> call(String s) throws Exception {
return new Tuple2<Integer, String>(Integer.valueOf(s),s);
}
});
JavaPairRDD<Integer,String> sortPairs=pairs.sortByKey(false);
JavaRDD<String> sortList=sortPairs.map(new Function<Tuple2<Integer, String>, String>() {
@Override
public String call(Tuple2<Integer, String> s) throws Exception {
return s._2;
}
});
List<String> list=sortList.top(3);
Iterator<String> it=list.iterator();
while (it.hasNext()){
System.out.println(it.next());
}
}
}
2、对每个班级内的学生成绩,取出前3名。(分组取topn)
import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
* 分组取top3
* @author Administrator
*
*/
public class GroupTop3 {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Top3")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("D:\\BaiduYunDownload\\score.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String line) throws Exception {
String[] lineSplited = line.split(" ");
return new Tuple2<String, Integer>(lineSplited[0],
Integer.valueOf(lineSplited[1]));
}
});
JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey();
JavaPairRDD<String, Iterable<Integer>> top3Score = groupedPairs.mapToPair(
new PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Iterable<Integer>> call(
Tuple2<String, Iterable<Integer>> classScores)
throws Exception {
Integer[] top3 = new Integer[3];
String className = classScores._1;
Iterator<Integer> scores = classScores._2.iterator();
while(scores.hasNext()) {
Integer score = scores.next();
for(int i = 0; i < 3; i++) {
if(top3[i] == null) {
top3[i] = score;
break;
} else if(score > top3[i]) {
for(int j = 2; j > i; j--) {
top3[j] = top3[j - 1];
}
top3[i] = score;
break;
}
}
}
return new Tuple2<String,
Iterable<Integer>>(className, Arrays.asList(top3));
}
});
top3Score.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
System.out.println("class: " + t._1);
Iterator<Integer> scoreIterator = t._2.iterator();
while(scoreIterator.hasNext()) {
Integer score = scoreIterator.next();
System.out.println(score);
}
System.out.println("=======================================");
}
});
sc.close();
}
}
Spark RDD高级编程:基于排序机制的wordcount程序+二次排序+topn
原文:https://www.cnblogs.com/Transkai/p/11349465.html