reduce(func)
通过func函数聚集RDD中的所有元素并得到最终的结果,先聚合分区内数据,再聚合分区间数据。Func函数决定了聚合的方式。
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf(). setMaster("local[*]").setAppName("spark")) val value: RDD[Int] = sc.makeRDD(Array(1,2,3)) val result: Int = value.reduce(_ + _) println(result) val value1: RDD[(String, Int)] = sc.makeRDD(Array(("a", 1), ("b", 2), ("c", 3))) val tuple: (String, Int) = value1.reduce((x: (String, Int), y: (String, Int)) => (x._1 + y._1, x._2 + y._2)) println(tuple) }
打印结果为:
6
cba 6
collect()案例
在驱动程序中,以数组的形式返回数据集的所有元素。数组的元素类型为RDD的元素类型;
返回RDD中元素的个数
val value: RDD[Int] = sc.makeRDD(Array(1,2,3))
println(value.count())
first()、take()
first返回RDD中的第一个元素
take返回一个由RDD的前n个元素组成的数组
从first的原码来看,它底层是由take实现的;
def first(): T = withScope { take(1) match { case Array(t) => t case _ => throw new UnsupportedOperationException("empty collection") } }
查看take的原码及其注释,原码目前看不大懂,应该是去取出相应个数的分区,将分区中的元素装配成数组,再返回该数组。所以可以这么理解:first取出的是第一个分区的第一个元素,take取出的是前x个分区的n个元素;
/** * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. * * @note This method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver‘s memory. * * @note Due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */ def take(num: Int): Array[T] = withScope { ... }
示例代码如下:
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf(). setMaster("local[*]").setAppName("spark")) val value: RDD[Int] = sc.makeRDD(Array(3,6,4,5,8,1,2,0),4); value.saveAsTextFile("E:/idea/spark2/out/take") println(value.first()) value.take(3).foreach(println) }
数据在分区的分布如下:
36 45 81 20
first取出的元素是3;
take取出的元素是364
印证了上述的函数执行逻辑;
takeOrdered(n)
返回该RDD排序(升序)后的前n个元素组成的数组
aggregate
参数:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。(分区内要与初始值进行聚合操作,分区间也要与初始值进行聚合操作)与reduce算子相比,aggregate能分别控制分区间和分区内的运算。
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf(). setMaster("local[*]").setAppName("spark")) val value: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4) val i: Int = value.aggregate(0)(_ + _, _ + _)//它这里的函数类型是可以推断出来的 println(i) }
打印结果为10;
fold(num)(func)
折叠操作,aggregate的简化操作,seqop和combop一样。
countByKey()
针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf(). setMaster("local[*]").setAppName("spark")) val value: RDD[(String, Int)] = sc.parallelize(Array(("a", 3), ("a", 2),("b",1))) val result: collection.Map[String, Long] = value.countByKey() result.foreach(println) }
打印结果为:
(a,2)
(b,1)
foreach(func)
在数据集的每一个元素上,运行函数func进行更新。注意:它是在driver端执行,不需要把数据提交到executor端执行。但是无法进行并行计算;
def main(args: Array[String]): Unit = { val sc: SparkContext = new SparkContext(new SparkConf(). setMaster("local[*]").setAppName("spark")) val raw: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4);//driver端执行 val ints: Array[Int] = raw.collect() ints.foreach(x => {println(x+1)})//在driver端执行 raw.foreach(x => {println(x+1)})//也是在driver端执行 } }
foreachPartition()
在数据集的每一个分区上,运行函数func()进行更新。它与foreach的源码如下:
/** * Applies a function f to all elements of this RDD. */ def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) } /** * Applies a function f to each partition of this RDD. */ def foreachPartition(f: Iterator[T] => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => cleanF(iter)) }
他们之间的性能区别类似于map与mapPartition。更详细的例子见博客《JdbcRDD连接MySQL》
原文:https://www.cnblogs.com/chxyshaodiao/p/12359203.html