1.创建RDD
val lines = sc.parallelize(List("pandas","i like pandas"))
2.加载本地文件到RDD
val linesRDD = sc.textFile("yangsy.txt")
3.过滤 filter 需要注意的是 filter并不会在原有RDD上过滤,而是根据filter的内容重新创建了一个RDD
val spark = linesRDD.filter(line => line.contains("damowang"))
4.count() 也是aciton操作 由于spark为懒加载 之前的语句不管对错其实都没执行 只有到调用action 如count() first() foreach()等操作的时候 才会真正去执行
spark.count()
5.foreach(println) 输出查看数据 (使用take可获取少量数据,如果工程项目中为DataFrame,可以调用show(1)) 这里提到一个东西,就是调用collect()函数 这个函数会将所有数据加载到driver端,一般数据量巨大的时候还是不要调用collect函数()否则会撑爆dirver服务器 虽然我们项目中暂时的确是用collect()把4000多万数据加载到dirver上了- =)
spark.take(1).foreach(println)
6.常见的转化操作和行动操作 常见的转化操作如map()和filter()
比如计算RDD中各值的平方:
val input = sc.parallelize(List(1,2,3,4)) val result = input.map(x => x*x) println(result.collect().mkString(","))
7.flatMap() 与map类似,不过返回的是一个返回值序列的迭代器。得到的是一个包含各种迭代器可访问的所有元素的RDD。简单的用途比如把一个字符串切分成单词
val lines = sc.parallelize(List("xiaojingjing is my love","damowang","kings_landing")) val words = lines.flatMap(line => line.split(",")) //调用first()返回第一个值 words.first()
归类总结RDD的transformation操作:
对一个数据集(1,2,3,3)的RDD进行基本的RDD转化操作
map: 将函数应用于RDD中的每个元素,将返回值构成一个新的RDD eg: rdd.map(x => x+1) result: {2,3,4,4)
flatmap:将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD,通常用来拆分 eg:rdd.flatMap(x => x.split(",")) .take(1).foreach(println) result: 1
flter:返回一个由通过传给filter的函数的元素组成的RDD eg:rdd.filter(x => x != 1) result: {2,3,3}
distinct:用来去重 eg:rdd.distinct() {1,2,3}
对数据分别为{1,2,3}和{3,4,5}的RDD进行针对两个RDD的转换操作
union: 生成一个包含所有两个RDD中所有元素的RDD eg: rdd.union(other) result:{1,2,3,3,4,5}
intersection:求两个元素中的共同的元素 eg:rdd.intersection(ohter) result:{3}
substract() 移除RDD中的内容 eg:rdd.substract(other) result:{1,2}
cartesian() 与另一个RDD的笛卡尔积 eg:rdd.cartesian(other) result:{(1,3),(1,4),(1,5)....(3,5)}
以上皆为transformation操作,下来action操作
9.reduce 并行整合RDD中所有数据
val lines1 = sc.parallelize(List(1,2,3,3))
lines1.reduce((x,y) => x + y)
10.reducebykey 最简单的就是实现wordcount的 统计出现的数目,原理在于map函数将rdd转化为一个二元组,再通过reduceByKey进行元祖的归约。
val linesRDD = sc.textFile("yangsy.txt")
val count = linesRDD.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_).collect()
11.aggregate函数 与reduce相似,不过返回的是不同类型的函数
val result = input.aggregate((0,0))(acc.value) => (acc._1+value,acc._2+1),(acc1,acc2) =>(acc1._1 + acc2._1 , acc1._2 + acc2._2))
还有很多比如count(),take(num)等就不一一练习了
12.collect函数还有foreach函数 其实刚才已经用到了,这里也不多说了~
归纳总结RDD的action操作:
对一个数据为{1,2,3,3}的RDD的操作
collect: 返回RDD中的所有元素 rdd.collect()
count: RDD中的元素的个数
countByValue: 返回各元素在RDD中出现的次数 : eg:rdd.countByValue() [(1,1),(2,1),(3,2)....]
take(num): 从RDD中返回num个元素
top(num) : 从RDD中返回最前面的num个元素
takeSample(withReplacement,num,[seed]) : 从RDD中返回任意一些元素 eg: rdd.takeSample(false,1)
reduce(func): 并行整合RDD中所有的数据 rdd.reduce(x,y) => x + y)
foreach(func):对RDD中的每个元素使用给定的函数
在调用persist()函数将数据缓存如内存 想删除的话可以调用unpersist()函数
spark RDD transformation与action函数巩固 (未完)
原文:http://www.cnblogs.com/yangsy0915/p/5002559.html