要求和代码如下
开发需求:使用Spark RDD完成以下功能
给定数据字段如下:
班号 姓名 年龄 性别 科目 成绩
12 张三 25 男 chinese 50
(说明:数据在data.txt文件内。)
mport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Test1 {
System.setProperty("hadoop.home.dir","E:/x3/hadoop-2.9.2")
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("test1").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val dataRdd: RDD[String] = sc.textFile("C:\\Users\\s1378\\Desktop\\week3data2/data.txt")
//过滤第一行 并且去掉行号
val linesRdd: RDD[String] = dataRdd.zipWithIndex().filter(line => line._2>=1).map(line => line._1)
// 分割数据 输出 (姓名,年龄) 过滤出年龄小于20的人 根据姓名去重 求人数
//val count1: Long = linesRdd.map(line => (line.split(" ")(1), line.split(" ")(2).toInt)).filter(line => line._2 < 20).distinct().count()
//println(s"小于20岁参加考试的有 $count1 人")
// 注释同上
//val count2: Long = linesRdd.map(line => (line.split(" ")(1),line.split(" ")(3))).filter(line => line._2.equals("男")).distinct().count()
//println(s"一共有 $count2 个男生参加考试")
//val count3: Long = linesRdd.map(line => (line.split(" ")(0).toInt,line.split(" ")(1))).filter(line => line._1==12).groupBy(line => line._2).count()
//println(s"12班一共有 $count3 人参加考试")
/*val peopleSum: Long = linesRdd.map(line => (line.split(" ")(4),line.split(" ")(5))).filter(line => line._1.equals("chinese")).count()
val tupleSum: RDD[(String, Double)] = linesRdd.map(line => (line.split(" ")(4),line.split(" ")(5).toDouble)).filter(line => line._1.equals("chinese")).reduceByKey((x, y)=> x+y).map(line => (line._1,line._2 / peopleSum))
tupleSum.foreach(x => println("语文平均成绩:"+x._2))*/
/*val sum12: Long = linesRdd.map(line => (line.split(" ")(0).toInt,line.split(" ")(5))).filter(line => line._1 == 12).count()
val class12: RDD[(Int, Double)] = linesRdd.map(line => (line.split(" ")(0).toInt,line.split(" ")(5).toDouble)).filter(line => line._1 == 12).reduceByKey((x, y) => x+y).map(line => (line._1,line._2/sum12))
class12.foreach(x=>println("12班平均分:"+x._2))*/
/* val sum12: Long = linesRdd.map(line => (line.split(" ")(0).toInt,line.split(" ")(3),line.split(" ")(5))).filter(line => line._1 == 12).filter(line => line._2.equals("女")).count()
val avg12: RDD[(Int,Double)] = linesRdd.map(line => (line.split(" ")(0).toInt,line.split(" ")(3),line.split(" ")(5))).filter(line => line._1 == 12).filter(line => line._2.equals("女")).map(line => (line._1,line._3.toInt)).reduceByKey(_+_).map(line=>(line._1,line._2/sum12))
avg12.foreach(x=>println("12班女生平均成绩:"+x._2))*/
/*val sum: Long = linesRdd.map(line => (line.split(" ")(4),line.split(" ")(5))).filter(line => line._1.equals("chinese")).count()
val avg: RDD[(String, Double)] = linesRdd.map(line => (line.split(" ")(4),line.split(" ")(5).toDouble)).filter(line => line._1.equals("chinese")).reduceByKey((x, y) => x+y).map(line => (line._1,line._2/sum))
avg.foreach(x=> println("全校语文平均成绩:"+x._2))*/
/* val sum12: Long = linesRdd.map(line => (line.split(" ")(0).toInt,line.split(" ")(4),line.split(" ")(5))).filter(line => line._1 == 13).filter(line => line._2.equals("english")).count()
val avg12: RDD[(Int,Double)] = linesRdd.map(line => (line.split(" ")(0).toInt,line.split(" ")(4),line.split(" ")(5))).filter(line => line._1 == 13).filter(line => line._2.equals("english")).map(line => (line._1,line._3.toInt)).reduceByKey(_+_).map(line=>(line._1,line._2/sum12))
// 没有数学成绩 用英语代替
avg12.foreach(x=>println("13班英语平均成绩:"+x._2))*/
val sum: Long = linesRdd.map(line => (line.split(" ")(0).toInt,line.split(" ")(3),line.split(" ")(5))).filter(line => line._1 == 12).filter(line => line._2.equals("女")).map(line => (line._1,line._3.toInt)).reduceByKey(_+_).filter(line => line._2.toInt>150).count()
println(s"12班女生总成绩大于150的有 $sum 个")
}
}
原文:https://www.cnblogs.com/whyuan/p/13034238.html