首页 > 其他 > 详细

RDD(五)——action

时间:2020-02-27 13:26:03      阅读:72      评论:0      收藏:0      [点我收藏+]

reduce(func)

通过func函数聚集RDD中的所有元素并得到最终的结果,先聚合分区内数据,再聚合分区间数据。Func函数决定了聚合的方式。

def main(args: Array[String]): Unit = {

    val sc: SparkContext = new SparkContext(new SparkConf().
      setMaster("local[*]").setAppName("spark"))

    val value: RDD[Int] = sc.makeRDD(Array(1,2,3))
    val result: Int = value.reduce(_ + _)
    println(result)

    val value1: RDD[(String, Int)] = sc.makeRDD(Array(("a", 1), ("b", 2), ("c", 3)))
    val tuple: (String, Int) = value1.reduce((x: (String, Int), y: (String, Int)) => (x._1 + y._1, x._2 + y._2))
    println(tuple)
  }

打印结果为:

6

cba 6

 

collect()案例

在驱动程序中,以数组的形式返回数据集的所有元素。数组的元素类型为RDD的元素类型;

 

count()案例

返回RDD中元素的个数

    val value: RDD[Int] = sc.makeRDD(Array(1,2,3))
    println(value.count())

 

first()、take()

first返回RDD中的第一个元素

 

take返回一个由RDD的前n个元素组成的数组

 

从first的原码来看,它底层是由take实现的;

 def first(): T = withScope {
    take(1) match {
      case Array(t) => t
      case _ => throw new UnsupportedOperationException("empty collection")
    }
  }

查看take的原码及其注释,原码目前看不大懂,应该是去取出相应个数的分区,将分区中的元素装配成数组,再返回该数组。所以可以这么理解:first取出的是第一个分区的第一个元素,take取出的是前x个分区的n个元素;

/**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   *
   * @note This method should only be used if the resulting array is expected to be small, as
   * all the data is loaded into the driver‘s memory.
   *
   * @note Due to complications in the internal implementation, this method will raise
   * an exception if called on an RDD of `Nothing` or `Null`.
   */
  def take(num: Int): Array[T] = withScope {
    ...
    }

示例代码如下:

def main(args: Array[String]): Unit = {

    val sc: SparkContext = new SparkContext(new SparkConf().
      setMaster("local[*]").setAppName("spark"))

    val value: RDD[Int] = sc.makeRDD(Array(3,6,4,5,8,1,2,0),4);
    value.saveAsTextFile("E:/idea/spark2/out/take")

    println(value.first())
    value.take(3).foreach(println)
  }

数据在分区的分布如下:

36 45 81 20

first取出的元素是3;

take取出的元素是364

印证了上述的函数执行逻辑;

 

 takeOrdered(n)

返回该RDD排序(升序)后的前n个元素组成的数组

 

aggregate

参数:(zeroValue: U)(seqOp: (U, T) U, combOp: (U, U) U)

作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。(分区内要与初始值进行聚合操作,分区间也要与初始值进行聚合操作)与reduce算子相比,aggregate能分别控制分区间和分区内的运算。

def main(args: Array[String]): Unit = {

    val sc: SparkContext = new SparkContext(new SparkConf().
      setMaster("local[*]").setAppName("spark"))

    val value: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4)
    val i: Int = value.aggregate(0)(_ + _, _ + _)//它这里的函数类型是可以推断出来的
    println(i)
  }

打印结果为10;

 

fold(num)(func)

折叠操作,aggregate的简化操作,seqop和combop一样。

 

 countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

 def main(args: Array[String]): Unit = {
    val sc: SparkContext = new SparkContext(new SparkConf().
      setMaster("local[*]").setAppName("spark"))

    val value: RDD[(String, Int)] = sc.parallelize(Array(("a", 3), ("a", 2),("b",1)))
    val result: collection.Map[String, Long] = value.countByKey()
    result.foreach(println)
  }

打印结果为:

(a,2)

(b,1)

 

foreach(func)

在数据集的每一个元素上,运行函数func进行更新。注意:它是在driver端执行,不需要把数据提交到executor端执行。但是无法进行并行计算;

  def main(args: Array[String]): Unit = {

    val sc: SparkContext = new SparkContext(new SparkConf().
      setMaster("local[*]").setAppName("spark"))

    val raw: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4), 4);//driver端执行

    val ints: Array[Int] = raw.collect()
    ints.foreach(x => {println(x+1)})//在driver端执行
    
    raw.foreach(x => {println(x+1)})//也是在driver端执行
  }
}

 

foreachPartition()

在数据集的每一个分区上,运行函数func()进行更新。它与foreach的源码如下:

/**
   * Applies a function f to all elements of this RDD.
   */
  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

  /**
   * Applies a function f to each partition of this RDD.
   */
  def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
  }

他们之间的性能区别类似于map与mapPartition。更详细的例子见博客《JdbcRDD连接MySQL》 

RDD(五)——action

原文:https://www.cnblogs.com/chxyshaodiao/p/12359203.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!