按照厦大给出的大数据学习路线,开始了spark的学习,这是spark基础的实验。我们常说的Spark在Spark生态中应该指的是Spark-Core负责批处理的部分,而Spark中还有Spark SQL,Spark Streaming、Spark MLlib和GraphX的组件。
请到教材官网的“下载专区”的“数据集”中下载chapter4-data1.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所示:
Tom,DataBase,80
Tom,Algorithm,50
Tom,DataStructure,60
Jim,DataBase,90
Jim,Algorithm,60
Jim,DataStructure,8
……
请根据给定的实验数据,在pyspark中通过编程来计算以下内容:
(1)该系总共有多少学生;
data.map(lambda x:x.split(‘,‘)).map(lambda x:(x[0],x[1:])).groupByKey().count() # 265
(2)该系共开设了多少门课程;
data.map(lambda x:x.split(‘,‘)).map(lambda x:(x[1],‘‘)).groupByKey().count() # 8
(3)Tom同学的总成绩平均分是多少;
data.map(lambda x:x.split(‘,‘)).map(lambda x:(x[0],int(x[2]))).filter(lambda x:x[0]==‘Tom‘).mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).mapValues(lambda x:x[0]/x[1]*1.0).foreach(print) # (Tom,30.8)
(4)求每名同学的选修的课程门数;
data.map(lambda x:x.split(‘,‘)).map(lambda x:(x[0],1)).groupByKey().mapValues(lambda x:sum(x)).foreach(print)
(5)该系DataBase课程共有多少人选修;
data.map(lambda x:x.split(‘,‘)).map(lambda x:(x[1],1)).groupByKey().mapValues(lambda x:sum(x)).filter(lambda x:x[0]==‘DataBase‘).foreach(print) # (‘DataBase‘,126)
(6)各门课程的平均分是多少;
data.map(lambda x:x.split(‘,‘)).map(lambda x:(x[1],(float(x[2]),1))).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).mapValues(lambda x:x[0]/x[1]).foreach(lambda x:print("({},{:.2f})".format(x[0],x[1])))
(7)使用累加器计算共有多少人选了DataBase这门课。
data.map(lambda x:x.split(‘,‘)).map(lambda x:(x[1],1)).filter(lambda x:x[0]==‘DataBase‘).map(lambda x:x[1]).reduce(lambda a,b:a+b) # 126
对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z
输入文件B的样例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根据输入的文件A和B合并得到的输出文件C的样例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z
from pyspark import SparkConf,SparkContext
data = sc.textFile(‘file:///home/hadoop/Desktop/SparkPractice/practice/dataset2‘)
data.map(lambda x:(x,‘‘)). groupByKey(). map(lambda x:x[0]). repartition(1). saveAsTextFile(‘file:///home/hadoop/Desktop/SparkPractice/practice/dataset2/C‘)
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster(‘local‘).setAppName(‘Get Average‘)
sc = SparkContext(conf = conf)
data = sc.textFile(‘file:///home/hadoop/Desktop/SparkPractice/practice/dataset3‘)
data.map(lambda x:x.split(‘ ‘)). map(lambda x:(x[0],(float(x[1].strip()),1))). reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])). mapValues(lambda x:x[0]/x[1]). repartition(1). saveAsTextFile(‘file:///home/hadoop/Desktop/SparkPractice/practice/dataset3/output‘)
原文:https://www.cnblogs.com/ginkgo-/p/13292484.html