RDD操作分为转换操作和行动操作。
对于RDD而言,每一次的转化操作都会产生不同的RDD,供一个操作使用。
我们每次转换得到的RDD是惰性求值的
也就是说,整个转换过程并不是会真正的去计算,而是只记录了转换的轨迹。
当遇到行动操作的时候,才会发生真正的计算,从DAG图的源头开始进行“从头到尾”的计算。
常见的操作
| 
 操作类型  | 
 函数名  | 
 作用  | 
| 
 转化操作  | 
 map()  | 
 参数是函数,函数应用于RDD每一个元素,返回值是新的RDD  | 
| 
 flatMap()  | 
 参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD  | 
|
| 
 filter()  | 
 参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD  | 
|
| 
 distinct()  | 
 没有参数,将RDD里的元素进行去重操作  | 
|
| 
 union()  | 
 参数是RDD,生成包含两个RDD所有元素的新RDD  | 
|
| 
 intersection()  | 
 参数是RDD,求出两个RDD的共同元素  | 
|
| 
 subtract()  | 
 参数是RDD,将原RDD里和参数RDD里相同的元素去掉  | 
|
| 
 cartesian()  | 
 参数是RDD,求两个RDD的笛卡儿积  | 
|
| 
 行动操作  | 
 collect()  | 
 返回RDD所有元素  | 
| 
 count()  | 
 RDD里元素个数  | 
|
| 
 countByValue()  | 
 各元素在RDD中出现次数  | 
|
| 
 reduce()  | 
 并行整合所有RDD数据,例如求和操作  | 
|
| 
 fold(0)(func)  | 
 和reduce功能一样,不过fold带有初始值  | 
|
| 
 aggregate(0)(seqOp,combop)  | 
 和reduce功能一样,但是返回的RDD数据类型和原RDD不一样  | 
|
| 
 foreach(func)  | 
 对RDD每个元素都是使用特定函数  | 
除此之外我们还用到过的转换操作还有
1.groupByKey():应用于(K,V)键值对的数据集,返回一个新的(K,Iterable)形式的数据集
2.reduceByKey(func):应用于(K,V)键值对的数据集,返回一个新的(K,V)形式的数据集
其中每个值是将每个Key传入到func中进行聚合。
除此之外我们还用到过的行动操作还有
1.first():返回数据集的第一个元素
2.take(n):以数组形式返回数据集的前n个元素。
示例
转化操作
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1)
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
val rddFile:RDD[String] = sc.textFile(path, 1)
val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))
/* map操作 */
    println("======map操作======")
    println(rddInt.map(x => x + 1).collect().mkString(","))
    println("======map操作======")
/* filter操作 */
    println("======filter操作======")
    println(rddInt.filter(x => x > 4).collect().mkString(","))
    println("======filter操作======")
        
/* flatMap操作 */
    println("======flatMap操作======")
    println(rddFile.flatMap { x => x.split(",") }.first())
    println("======flatMap操作======")
/* distinct去重操作 */
    println("======distinct去重======")
    println(rddInt.distinct().collect().mkString(","))
    println(rddStr.distinct().collect().mkString(","))
    println("======distinct去重======")
/* union操作 */
    println("======union操作======")
    println(rdd01.union(rdd02).collect().mkString(","))
    println("======union操作======")
/* intersection操作 */
    println("======intersection操作======")
    println(rdd01.intersection(rdd02).collect().mkString(","))
    println("======intersection操作======")
/* subtract操作 */
    println("======subtract操作======")
    println(rdd01.subtract(rdd02).collect().mkString(","))
    println("======subtract操作======")
/* cartesian操作 */
    println("======cartesian操作======")
    println(rdd01.cartesian(rdd02).collect().mkString(","))
    println("======cartesian操作======")
行动操作
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
/* count操作 */
    println("======count操作======")
    println(rddInt.count())
    println("======count操作======")
    
/* countByValue操作 */
    println("======countByValue操作======")
    println(rddInt.countByValue())
    println("======countByValue操作======")
    
/* reduce操作 */
    println("======countByValue操作======")
    println(rddInt.reduce((x, y) => x + y))
    println("======countByValue操作======")
/* fold操作 */
    println("======fold操作======")
    println(rddInt.fold(0)((x, y) => x + y))
    println("======fold操作======")
/* aggregate操作 */
    println("======aggregate操作======")
    val res: (Int, Int) = rddInt.aggregate((0, 0))((x, y) => (x._1 + x._2, y),
                                                               (x, y) => (x._1 + x._2, y._1 + y._2))
    println(res._1 + "," + res._2)
    println("======aggregate操作======")
/* foreach操作 */
    println("======foeach操作======")
    println(rddStr.foreach { x => println(x) })
    println("======foeach操作======")        
原文:https://www.cnblogs.com/zzhangyuhang/p/8989894.html