Transformation(转换):根据数据集创建一个新的 数据集,计算后返回一个新的RDD。例如,一个RDD进行map操作后,生成了新的RDD。
Action(动作):对RDD结果计算返回一个数值value给驱动程序,或者把结果存储到外部存储系统中;
例如:collect算子将数据集的所有元数据收集完成返回给驱动程序。
RDD中的所有转换都是延迟加载的,也就是说,他们并不会直接计算结果。相反的,他们只是记住这些应用到基础数据集(例如一个文件)上转换动作。只有当发生一个要求返回结果给Driver的动作或者将结果写入到外部存储中,这写转换才会真正的运行,这种设计让Spark更加有效率的运行。
Action算子返回结果或保存结果,如count,collect,save等,Action操作是返回结果或将结果写入存储的操作,Action是Spark应用程序真正执行的触发动作 .
说明:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
scala> var source = sc.parallelize(1 to 10) source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24 scala> source.collect() res4: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> var source2 = source.map(_+1) source2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:25 scala> source2.collect() res5: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
l类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。
假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区
scala> val rdd = sc.parallelize(List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female"))) rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24 scala> :paste // Entering paste mode (ctrl-D to finish) def partitionsFun(iter : Iterator[(String,String)]) : Iterator[String] = { var woman = List[String]() while (iter.hasNext){ val next = iter.next() next match { case (_,"female") => woman = next._1 :: woman case _ => } } woman.iterator } // Exiting paste mode, now interpreting. partitionsFun: (iter: Iterator[(String, String)])Iterator[String] scala> val result = rdd.mapPartitions(partitionsFun) result: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at mapPartitions at <console>:28 scala> result.collect() res13: Array[String] = Array(kpop, lucy)
将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24 scala> rdd.glom().collect() res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
scala> val sourceFlat = sc.parallelize(1 to 5) sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24 scala> sourceFlat.collect() res11: Array[Int] = Array(1, 2, 3, 4, 5) scala> val flatMap = sourceFlat.flatMap(1 to _) flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>:26 scala> flatMap.collect() res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi")) sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24 scala> val filter = sourceFilter.filter(_.contains("xiao")) filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26 scala> sourceFilter.collect() res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi) scala> filter.collect() res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)
原文:https://www.cnblogs.com/phy2020/p/12747593.html