| 
 rdd api  | 
 备注  | 
| 
 persist/cache  | 
 
  | 
| 
 map(f: T => U)  | 
 
  | 
| 
 keyBy(f: T => K)  | 
 特殊的map,提key  | 
| 
 flatMap(f: T => Iterable[U])  | 
 map的一种,类似UDTF  | 
| 
 filter(f: T => Boolean)  | 
 map的一种  | 
| 
 distinct(numPartitions)  | 
 rdd的实现为 map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)reduceByKey是特殊的combineByKey,其mergeValue函数和mergeCombiners函数一致,都是(x, y) => x  | 
| 
 repartition(numPartitions)/coalesce(numPartitions)  | 
 repartition用于增减rdd分区。coalesce特指减少分区,可以通过一次窄依赖的映射避免shuffle  | 
| 
 sample()/randomSplit()/takeSample()  | 
 采样  | 
| 
 union(RDD[T])  | 
 不去重。使用distinct()去重  | 
| 
 sortBy[K](f: (T) => K)  | 
 传入的f是提key函数,rdd的实现为 keyBy(f).sortByKey().values() 这次操作为RDD设置了一个RangePartitioner  | 
| 
 intersection(RDD[T])  | 
 两个集合取交集,并去重。RDD的实现为map(v => (v, null)).cogroup(other.map(v => (v, null))).filter(两边都空).keys() cogroup是生成K, List[V], List[V]的形态,这个过程可能内含一次shuffle操作,为了两边RDD的分区对齐。  | 
| 
 glom():RDD[Array[T]]  | 
 把每个分区的数据合并成一个Array。原本每个分区是T的迭代器。  | 
| 
 cartesian(RDD[U]): RDD[(T, U)]  | 
 求两个集合的笛卡尔积。RDD的做法是两个RDD内循环、外循环yield出每对(x, y)  | 
| 
 groupBy[K](f: T => K): RDD[(K, Iterable[T])]  | 
 RDD建议如果后续跟agg的话,直接使用aggregateByKey或reduceByKey更省时,这两个操作本质上就是combineByKey  | 
| 
 pipe(command: String)  | 
 把RDD数据通过ProcessBuilder创建额外的进程输出走  | 
| 
 mapPartitions(f: Iterator[T] => Iterator[U])/mapPartitionsWithIndex(f: (Int, Iterator[T]) => Iterator[U])  | 
 RDD的每个分区做map变换  | 
| 
 zip(RDD[U]): RDD[(T, U)]  | 
 两个RDD分区数目一致,且每个分区数据条数一致  | 
| 
 rdd api  | 
 备注  | 
| 
 foreach(f: T => Unit)  | 
 rdd实现为调用sc.runJob(),把f作用于每个分区的每条记录  | 
| 
 foreachPartition(f: Iterator[T] => Unit)  | 
 rdd实现为调用sc.runJob(),把f作用于每个分区  | 
| 
 collect(): Array[T]  | 
 rdd实现为调用sc.runJob(),得到results,把多个result的array合并成一个array  | 
| 
 toLocalIterator()  | 
 把所有数据以迭代器返回,rdd实现是调用sc.runJob(),每个分区迭代器转array,收集到driver端再flatMap一次打散成大迭代器。理解为一种比较特殊的driver端cache  | 
| 
 collect[U](f: PartailFunction[T, U]): RDD[U]  | 
 rdd实现为filter(f.isDefinedAt).map(f) 先做一次filter找出满足的数据,然后一次map操作执行这个偏函数。  | 
| 
 subtract(RDD[T])  | 
 rdd实现为map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys 与求交类似  | 
| 
 reduce(f: (T, T) => T)  | 
 rdd实现为调用sc.runJob(),让f在rdd每个分区计算一次,最后汇总merge的时候再计算一次。  | 
| 
 treeReduce(f: (T, T) => T, depth = 2)  | 
 见treeAggregate  | 
| 
 fold(zeroValue: T)(op: (T, T) => T)  | 
 特殊的reduce,带初始值,函数式语义的fold  | 
| 
 aggregate(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)  | 
 带初始值、reduce聚合、merge聚合三个完整条件的聚合方法。rdd的做法是把函数传入分区里去做计算,最后汇总各分区的结果再一次combOp计算。  | 
| 
 treeAggregate(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)(depth = 2)  | 
 在分区处,做两次及以上的merge聚合,即每个分区的merge计算可能也会带shuffle。其余部分同aggregate。理解为更复杂的多阶aggregate  | 
| 
 count()  | 
 rdd实现为调用sc.runJob(),把每个分区的size汇总在driver端再sum一次  | 
| 
 countApprox(timeout, confidence)  | 
 提交个体DAGScheduler特殊的任务,生成特殊的任务监听者,在timeout时间内返回,没计算完的话返回一个大致结果,返回值的计算逻辑可见ApproximateEvaluator的子类  | 
| 
 countByValue(): Map[T, Long]  | 
 rdd实现为map(value => (value, null)).countByKey() 本质上是一次简单的combineByKey,返回Map,会全load进driver的内存里,需要数据集规模较小  | 
| 
 countByValueApprox()  | 
 同countApprox()  | 
| 
 countApproxDistinct()  | 
 实验性方法,用streamlib库实现的HyperLogLog做  | 
| 
 zipWithIndex(): RDD[(T, Long)]/zipWithUniqueId(): RDD[(T, Long)]  | 
 与生成的index做zip操作  | 
| 
 take(num): Array[T]  | 
 扫某个分区  | 
| 
 first()  | 
 即take(1)  | 
| 
 top(n)(ordering)  | 
 每个分区内传入top的处理函数,得到分区的堆,使用rdd.reduce(),把每个分区的堆合起来,排序,取前n个  | 
| 
 max()/min()  | 
 特殊的reduce,传入max/min比较函数  | 
| 
 saveAsXXXXX  | 
 输出存储介质  | 
| 
 checkpoint  | 
 显示cp声明  | 
| 
 rdd api  | 
 备注  | 
| 
 combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]  | 
 传统MR定义拆分,重要基础api  | 
| 
 aggregateByKey[U](zeroValue: U, seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]  | 
 rdd里,把zeroValue转成了一个createCombiner方法,然后调用了combineByKey()。本质上两者是一样的。  | 
| 
 foldByKey(zeroValue: V, func: (V, V) => V): RDD[(K, V)]  | 
 func即被当作mergeValue,又被当作mergeCombiners,调用了combineByKey()  | 
| 
 sampleByKey()  | 
 生成一个与key相关的sampleFunc,调用rdd.mapPartitionsWithIndex(sampleFunc)  | 
| 
 reduceByKey()  | 
 调用combineByKey  | 
| 
 reduceByKeyLocally(func: (V, V) => V): Map[K, V]  | 
 rdd实现为self.mapPartitions(reducePartition).reduce(mergeMaps)reducePartition是在每个分区生成一个HashMap,mergeMaps是合并多个HashMap  | 
| 
 countByKey()  | 
 rdd实现为mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap  | 
| 
 countByKeyApprox()  | 
 rdd实现为map(_._1).countByValueApprox  | 
| 
 countApproxDistinctByKey()  | 
 类似rdd的countApproxDistinct方法,区别是把方法作用在了combineByKey里面  | 
| 
 groupByKey()  | 
 简单的combineByKey实现  | 
| 
 partitionBy(partitioner)  | 
 为rdd设置新的分区结构  | 
| 
 join(RDD[(K, W)]): RDD[(K, (V, W))]  | 
 rdd实现为cogroup(other, partitioner).flatMapValues(...)  | 
| 
 leftOuterJoin(…)  | 
 实现同上,只是flatMapValues里面遍历两个rdd,yield出结果的判断逻辑变了下  | 
| 
 rightOuterJoin(…)  | 
 同上  | 
| 
 fullOuterJoin(…)  | 
 同上  | 
| 
 collectAsMap()  | 
 rdd实现为collect().foreach(pairToMap)  | 
| 
 mapValues(f: V => U)  | 
 一种简单的map()操作  | 
| 
 flatMapValues(f: V => Iterable[U])  | 
 一种简单的map()操作  | 
| 
 cogroup(RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]  | 
 做集合性操作的基础api,包括各种join、求交等  | 
| 
 subtractByKey(RDD[(K, W)]): RDD[(K, V)]  | 
 从原来的rdd里排除右侧有的keys  | 
| 
 lookup(key: K): Seq[V]  | 
 rdd实现的时候,然后分区是基于key的,那比较高效可以直接遍历对应分区,否则全部遍历。全部遍历的实现为filter(_._1 == key).map(_._2).collect()  | 
| 
 saveAsXXX  | 
 写外部存储  | 
| 
 keys()  | 
 一种简单的map()操作  | 
| 
 values()  | 
 一种简单的map()操作  | 
countAsync, collectAsync, takeAsync, foreachAsync, foreachPartitionAsync
针对RDD[K: Ordering, V]
| 
 rdd api  | 
 备注  | 
| 
 sortByKey()  | 
 见rdd.sortBy()里的解释  | 
| 
 filterByRange(lower: K, upper: K)  | 
 当rdd分区是RangePartition的时候可以做这样的filter  | 
针对RDD[Double]
| 
 rdd api  | 
 备注  | 
| 
 sum()  | 
 rdd实现是reduce(_ + _)  | 
| 
 stats()  | 
 rdd实现是mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b)) StatCounter在一次遍历里统计出中位数、方差、count三个值,merge()是他内部的方法  | 
| 
 mean()  | 
 rdd实现是stats().mean  | 
| 
 variance()/sampleVariance()  | 
 rdd实现是stats().variance  | 
| 
 stdev()/sampleStdev()  | 
 rdd实现是stats().stdev 求标准差  | 
| 
 meanApprox()/sumApprox()  | 
 调用runApproximateJob  | 
| 
 histogram()  | 
 比较复杂的计算,rdd实现是先mapPartitions再reduce,包含几次递归  | 
原文:https://www.cnblogs.com/shan13936/p/13840676.html