首页 > 其他 > 详细

SparkRDD函数详解

时间:2020-04-26 11:26:27      阅读:58      评论:0      收藏:0      [点我收藏+]
技术分享图片
   1 1、RDD操作详解
   2 启动spark-shell
   3 spark-shell --master spark://hdp-node-01:7077 
   4 Spark core 核心数据抽象是RDD
   5 
   6 1.1 基本转换
   7 1) map
   8 map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
   9 举例:
  10 //设置spark的配置文件信息
  11 val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local")
  12 //构建sparkcontext上下文对象,它是程序的入口,所有计算的源头
  13 val sc: SparkContext = new SparkContext(sparkConf)
  14 //定义一个列表
  15 val list: List[Int] = List(1, 2, 3, 4, 5, 6, 7)
  16 //生成一个rdd
  17 val rdd: RDD[Int] = sc.parallelize(list)
  18 //map算子
  19 val map: RDD[Int] = rdd.map(x => x * 2)
  20 //foreach 算子打印
  21 map.foreach(x => println(x))
  22 sc.stop()
  23 上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。
  24 2) filter
  25 filter 是对RDD中的每个元素都执行一个指定的函数来过滤产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
  26 //设置spark的配置文件信息
  27 val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local")
  28 //构建sparkcontext上下文对象,它是程序的入口,所有计算的源头
  29 val sc: SparkContext = new SparkContext(sparkConf)
  30 sc.setLogLevel("WARN")
  31 //定义一个列表
  32 val list: List[Int] = List(1, 2, 3, 4, 5, 6, 7)
  33 //生成一个rdd
  34 val rdd: RDD[Int] = sc.parallelize(list)
  35 //filter算子
  36 val map: RDD[Int] = rdd.filter(_ > 5)
  37 //foreach 算子打印
  38 map.foreach(x => println(x))
  39 sc.stop()
  40  
  41 3) flatMap
  42 与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)
  43 scala> val a = sc.parallelize(1 to 4, 2)
  44 scala> val b = a.flatMap(x => 1 to x)
  45 scala> b.collect
  46 res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)
  47 
  48 4) mapPartitions
  49 mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 它的函数定义为:
  50 def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
  51 f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。
  52 举例:
  53 scala> val a = sc.parallelize(1 to 9, 3)
  54 scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
  55   var res = List[(T, T)]() 
  56   var pre = iter.next 
  57 while (iter.hasNext) {
  58     val cur = iter.next
  59     res.::=(pre, cur)
  60       pre = cur  } 
  61   res.iterator
  62 }
  63 scala> a.mapPartitions(myfunc).collect
  64 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
  65 上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。 mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。
  66 5) mapPartitionsWithIndex
  67 def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
  68 函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。
  69 
  70 var rdd1 = sc.makeRDD(1 to 5,2)
  71 //rdd1有两个分区
  72 var rdd2 = rdd1.mapPartitionsWithIndex{
  73         (x,iter) => {
  74           var result = List[String]()
  75             var i = 0
  76             while(iter.hasNext){
  77               i += iter.next()
  78             }
  79             result.::(x + "|" + i).iterator
  80            
  81         }
  82       }
  83 //rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引
  84 scala> rdd2.collect
  85 res13: Array[String] = Array(0|3, 1|12)
  86  
  87  
  88 6) coalesce
  89 
  90 def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
  91 该函数用于将RDD进行重分区,使用HashPartitioner。
  92 第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;
  93 以下面的例子来看:
  94 scala> var data = sc.parallelize(1 to 12, 3) 
  95 scala> data.collect 
  96 scala> data.partitions.size 
  97 scala> var rdd1 = data.coalesce(1) 
  98 scala> rdd1.partitions.size 
  99 scala> var rdd1 = data.coalesce(4) 
 100 scala> rdd1.partitions.size
 101 res2: Int = 1   //如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,//否则,分区数不便
 102 scala> var rdd1 = data.coalesce(4,true) 
 103 scala> rdd1.partitions.size
 104 res3: Int = 4
 105  
 106 7) repartition
 107 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
 108 该函数其实就是coalesce函数第二个参数为true的实现
 109 scala> var data = sc.parallelize(1 to 12, 3) 
 110 scala> data.collect 
 111 scala> data.partitions.size 
 112 scala> var rdd1 = data. repartition(1) 
 113 scala> rdd1.partitions.size 
 114 scala> var rdd1 = data. repartition(4) 
 115 scala> rdd1.partitions.size
 116 res3: Int = 4
 117 8) randomSplit
 118 
 119 def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
 120 该函数根据weights权重,将一个RDD切分成多个RDD。
 121 该权重参数为一个Double数组
 122 第二个参数为random的种子,基本可忽略。
 123 scala> var rdd = sc.makeRDD(1 to 12,12)
 124 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at :21
 125 
 126 scala> rdd.collect
 127 res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)  
 128  
 129 scala> var splitRDD = rdd.randomSplit(Array(0.5, 0.1, 0.2, 0.2))
 130 splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[17] at randomSplit at :23, 
 131 MapPartitionsRDD[18] at randomSplit at :23, 
 132 MapPartitionsRDD[19] at randomSplit at :23, 
 133 MapPartitionsRDD[20] at randomSplit at :23)
 134  
 135 //这里注意:randomSplit的结果是一个RDD数组
 136 scala> splitRDD.size
 137 res8: Int = 4
 138 //由于randomSplit的第一个参数weights中传入的值有4个,因此,就会切分成4个RDD,
 139 //把原来的rdd按照权重0.5, 0.1, 0.2, 0.2,随机划分到这4个RDD中,权重高的RDD,划分到//的几率就大一些。
 140 //注意,权重的总和加起来为1,否则会不正常 
 141 scala> splitRDD(0).collect
 142 res10: Array[Int] = Array(1, 4)
 143  
 144 scala> splitRDD(1).collect
 145 res11: Array[Int] = Array(3)                                                    
 146  
 147 scala> splitRDD(2).collect
 148 res12: Array[Int] = Array(5, 9)
 149  
 150 scala> splitRDD(3).collect
 151 res13: Array[Int] = Array(2, 6, 7, 8, 10)
 152  
 153 9) glom
 154 
 155 def glom(): RDD[Array[T]]
 156 该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。
 157 
 158 scala> var rdd = sc.makeRDD(1 to 10,3)
 159 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at makeRDD at :21
 160 scala> rdd.partitions.size
 161 res33: Int = 3  //该RDD有3个分区
 162 scala> rdd.glom().collect
 163 res35: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
 164 //glom将每个分区中的元素放到一个数组中,这样,结果就变成了3个数组
 165 
 166 10) union并集
 167 
 168 val rdd1 = sc.parallelize(List(5, 6, 4, 3))
 169 val rdd2 = sc.parallelize(List(1, 2, 3, 4))
 170 //求并集
 171 val rdd3 = rdd1.union(rdd2)
 172 rdd3.collect
 173 11) distinct
 174 去重
 175 val rdd1 = sc.parallelize(List(5, 6, 4, 3))
 176 val rdd2 = sc.parallelize(List(1, 2, 3, 4))
 177 //求并集
 178 val rdd3 = rdd1.union(rdd2)
 179 //去重输出
 180 rdd3.distinct.collect
 181 
 182 12) intersection交集
 183 val rdd1 = sc.parallelize(List(5, 6, 4, 3))
 184 val rdd2 = sc.parallelize(List(1, 2, 3, 4))
 185 //求交集
 186 val rdd4 = rdd1.intersection(rdd2) 
 187 rdd4.collect
 188 
 189 13) subtract
 190 def subtract(other: RDD[T]): RDD[T]
 191 def subtract(other: RDD[T], numPartitions: Int): RDD[T]
 192 def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
 193 该函数返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。
 194 
 195 val rdd1 = sc.parallelize(List(5, 6, 6, 4, 3))
 196     val rdd2 = sc.parallelize(List(1, 2, 3, 4))
 197     //求差集
 198     val rdd4 = rdd1.subtract(rdd2)
 199 rdd4.collect
 200 14) subtractByKey
 201 
 202 def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]
 203 def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]
 204 def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]
 205 
 206 subtractByKey和基本转换操作中的subtract类似,只不过这里是针对K的,返回在主RDD中出现,并且不在otherRDD中出现的元素。
 207 参数numPartitions用于指定结果的分区数
 208 参数partitioner用于指定分区函数
 209 
 210 var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
 211 var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) 
 212 scala> rdd1.subtractByKey(rdd2).collect
 213 res13: Array[(String, String)] = Array((B,2))
 214 
 215 15) groupbyKey
 216 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
 217     val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
 218     //求并集
 219     val rdd4 = rdd1 union rdd2
 220     //按key进行分组
 221     val rdd5 = rdd4.groupByKey
 222 rdd5.collect
 223 16) reduceByKey
 224 顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。
 225 举例:
 226 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
 227     val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
 228     //求并集
 229     val rdd4 = rdd1 union rdd2
 230     //按key进行分组
 231     val rdd6 = rdd4.reduceByKey(_ + _)
 232     rdd6.collect()
 233 
 234 17) sortByKey
 235 将List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1))和List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))做wordcount,并按名称排序
 236 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
 237     val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
 238     val rdd3 = rdd1.union(rdd2)
 239     //按key进行聚合
 240     val rdd4 = rdd3.reduceByKey(_ + _)
 241     //false降序
 242     val rdd5 = rdd4.sortByKey(false)
 243 rdd5.collect
 244 18) sortBy
 245 将List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1))和List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))做wordcount,并按数值排序
 246 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
 247     val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
 248     val rdd3 = rdd1.union(rdd2)
 249     //按key进行聚合
 250     val rdd4 = rdd3.reduceByKey(_ + _)
 251     //false降序
 252     val rdd5 = rdd4.sortBy(_._2, false)
 253     rdd5.collect
 254 
 255 19) zip
 256 def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
 257 
 258 zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
 259 
 260 scala> var rdd1 = sc.makeRDD(1 to 5,2)
 261 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21
 262  
 263 scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
 264 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21
 265  
 266 scala> rdd1.zip(rdd2).collect
 267 res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))           
 268  
 269 scala> rdd2.zip(rdd1).collect
 270 res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))
 271  
 272 scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3)
 273 rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21
 274 scala> rdd1.zip(rdd3).collect
 275 java.lang.IllegalArgumentException: Can‘t zip RDDs with unequal numbers of partitions
 276 //如果两个RDD分区数不同,则抛出异常
 277  
 278 20) zipPartitions
 279 
 280 zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。
 281 该函数有好几种实现,可分为三类:
 282 
 283 参数是一个RDD
 284 def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]
 285 
 286 def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]
 287 
 288 这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息
 289 
 290 映射方法f参数为两个RDD的迭代器。
 291 
 292 scala> var rdd1 = sc.makeRDD(1 to 5,2)
 293 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21
 294  
 295 scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
 296 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21
 297  
 298 //rdd1两个分区中元素分布:
 299 scala> rdd1.mapPartitionsWithIndex{
 300      |         (x,iter) => {
 301      |           var result = List[String]()
 302      |             while(iter.hasNext){
 303      |               result ::= ("part_" + x + "|" + iter.next())
 304      |             }
 305      |             result.iterator
 306      |            
 307      |         }
 308      |       }.collect
 309 res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)
 310  
 311 //rdd2两个分区中元素分布
 312 scala> rdd2.mapPartitionsWithIndex{
 313      |         (x,iter) => {
 314      |           var result = List[String]()
 315      |             while(iter.hasNext){
 316      |               result ::= ("part_" + x + "|" + iter.next())
 317      |             }
 318      |             result.iterator
 319      |            
 320      |         }
 321      |       }.collect
 322 res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)
 323  
 324 //rdd1和rdd2做zipPartition
 325 scala> rdd1.zipPartitions(rdd2){
 326      |       (rdd1Iter,rdd2Iter) => {
 327      |         var result = List[String]()
 328      |         while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
 329      |           result::=(rdd1Iter.next() + "_" + rdd2Iter.next())
 330      |         }
 331      |         result.iterator
 332      |       }
 333      |     }.collect
 334 res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)
 335  
 336  
 337 参数是两个RDD
 338 def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]
 339 
 340 def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]
 341 
 342 用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。
 343 
 344 scala> var rdd1 = sc.makeRDD(1 to 5,2)
 345 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21
 346  
 347 scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
 348 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21
 349  
 350 scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2)
 351 rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21
 352  
 353 //rdd3中个分区元素分布
 354 scala> rdd3.mapPartitionsWithIndex{
 355      |         (x,iter) => {
 356      |           var result = List[String]()
 357      |             while(iter.hasNext){
 358      |               result ::= ("part_" + x + "|" + iter.next())
 359      |             }
 360      |             result.iterator
 361      |            
 362      |         }
 363      |       }.collect
 364 res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)
 365  
 366 //三个RDD做zipPartitions
 367 scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){
 368      |       (rdd1Iter,rdd2Iter,rdd3Iter) => {
 369      |         var result = List[String]()
 370      |         while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {
 371      |           result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())
 372      |         }
 373      |         result.iterator
 374      |       }
 375      |     }
 376 rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27
 377  
 378 scala> rdd4.collect
 379 res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)
 380  
 381 参数是三个RDD
 382 def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]
 383 
 384 def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]
 385 
 386 用法同上面,只不过这里又多了个一个RDD而已。
 387 
 388 
 389 21) zipWithIndex
 390 
 391 def zipWithIndex(): RDD[(T, Long)]
 392 该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
 393 scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
 394 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21
 395 scala> rdd2.zipWithIndex().collect
 396 res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))
 397  
 398 22) zipWithUniqueId
 399 
 400 def zipWithUniqueId(): RDD[(T, Long)]
 401 该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:
 402 每个分区中第一个元素的唯一ID值为:该分区索引号,
 403 每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)
 404 看下面的例子:
 405 scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
 406 rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21
 407 //rdd1有两个分区,
 408 scala> rdd1.zipWithUniqueId().collect
 409 res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
 410 //总分区数为2
 411 //第一个分区第一个元素ID为0,第二个分区第一个元素ID为1
 412 //第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4
 413 //第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5
 414 
 415 键值转换
 416 23) partitionBy
 417 
 418 def partitionBy(partitioner: Partitioner): RDD[(K, V)]
 419 该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。
 420 scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
 421 rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21
 422 scala> rdd1.partitions.size
 423 res20: Int = 2
 424  
 425 //查看rdd1中每个分区的元素
 426 scala> rdd1.mapPartitionsWithIndex{
 427      |         (partIdx,iter) => {
 428      |           var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
 429      |             while(iter.hasNext){
 430      |               var part_name = "part_" + partIdx;
 431      |               var elem = iter.next()
 432      |               if(part_map.contains(part_name)) {
 433      |                 var elems = part_map(part_name)
 434      |                 elems ::= elem
 435      |                 part_map(part_name) = elems
 436      |               } else {
 437      |                 part_map(part_name) = List[(Int,String)]{elem}
 438      |               }
 439      |             }
 440      |             part_map.iterator
 441      |            
 442      |         }
 443      |       }.collect
 444 res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))
 445 //(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中
 446  
 447 //使用partitionBy重分区
 448 scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
 449 rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23
 450  
 451 scala> rdd2.partitions.size
 452 res23: Int = 2
 453  
 454 //查看rdd2中每个分区的元素
 455 scala> rdd2.mapPartitionsWithIndex{
 456      |         (partIdx,iter) => {
 457      |           var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
 458      |             while(iter.hasNext){
 459      |               var part_name = "part_" + partIdx;
 460      |               var elem = iter.next()
 461      |               if(part_map.contains(part_name)) {
 462      |                 var elems = part_map(part_name)
 463      |                 elems ::= elem
 464      |                 part_map(part_name) = elems
 465      |               } else {
 466      |                 part_map(part_name) = List[(Int,String)]{elem}
 467      |               }
 468      |             }
 469      |             part_map.iterator
 470      |         }
 471      |       }.collect
 472 res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))
 473 //(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中
 474 24) mapValues
 475 mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。
 476 举例:
 477 scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
 478 scala> val b = a.map(x => (x.length, x))
 479 scala> b.mapValues("x" + _ + "x").collect
 480 res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))
 481 
 482 25) flatMapValues
 483 flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。
 484 举例
 485 val a = sc.parallelize(List((1, 2), (3, 4), (5, 6)))
 486     val b = a.flatMapValues(x => 1.to(x))
 487     b.collect.foreach(println)
 488 26) combineByKey
 489 
 490 def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
 491 def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
 492 def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
 493 该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。
 494 其中的参数:
 495 createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C ,分区内相同的key做一次
 496 mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C,分区内相同的key循环做
 497 mergeCombiners:分区合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C,分区之间循环做
 498 numPartitions:结果RDD分区数,默认保持原有的分区数
 499 partitioner:分区函数,默认为HashPartitioner
 500 mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true
 501 
 502 看下面例子:
 503 scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
 504 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21 
 505 scala> rdd1.combineByKey(
 506      |       (v : Int) => v + "_",   
 507      |       (c : String, v : Int) => c + "@" + v,  
 508      |       (c1 : String, c2 : String) => c1 + "$" + c2
 509      |     ).collect
 510 res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))
 511 
 512 其中三个映射函数分别为:
 513 createCombiner: (V) => C
 514 (v : Int) => v + “_” //在每一个V值后面加上字符_,返回C类型(String)
 515 mergeValue: (C, V) => C
 516 (c : String, v : Int) => c + “@” + v //合并C类型和V类型,中间加字符@,返回C(String)
 517 mergeCombiners: (C, C) => C
 518 (c1 : String, c2 : String) => c1 + “$” + c2 //合并C类型和C类型,中间加$,返回C(String)
 519 其他参数为默认值。
 520 最终,将RDD[String,Int]转换为RDD[String,String]。
 521 
 522 再看例子:
 523 
 524 rdd1.combineByKey(
 525       (v : Int) => List(v),
 526       (c : List[Int], v : Int) => v :: c,
 527       (c1 : List[Int], c2 : List[Int]) => c1 ::: c2
 528 ).collect
 529 res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))
 530 最终将RDD[String,Int]转换为RDD[String,List[Int]]。
 531 
 532 27) foldByKey
 533 
 534 def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
 535 
 536 def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
 537 
 538 def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
 539 
 540  
 541 
 542 该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V.
 543 
 544 例子:
 545 
 546 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
 547 scala> rdd1.foldByKey(0)(_+_).collect
 548 res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1)) 
 549 //将rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操
 550 //作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即:
 551 //("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2)
 552  
 553 再看:
 554 
 555 scala> rdd1.foldByKey(2)(_+_).collect
 556 res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))
 557 //先将zeroValue=2应用于每个V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函
 558 //数应用于初始化后的V,最后得到:(A,2+4),即:(A,6)
 559  
 560 再看乘法操作:
 561 
 562 scala> rdd1.foldByKey(0)(_*_).collect
 563 res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))
 564 //先将zeroValue=0应用于每个V,注意,这次映射函数为乘法,得到:("A",0*0), ("A",2*0),
 565 //即:("A",0), ("A",0),再将映射函//数应用于初始化后的V,最后得到:(A,0*0),即:(A,0)
 566 //其他K也一样,最终都得到了V=0
 567  
 568 scala> rdd1.foldByKey(1)(_*_).collect
 569 res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))
 570 //映射函数为乘法时,需要将zeroValue设为1,才能得到我们想要的结果。
 571  
 572 在使用foldByKey算子时候,要特别注意映射函数及zeroValue的取值。
 573 28) reduceByKeyLocally
 574 
 575 def reduceByKeyLocally(func: (V, V) => V): Map[K, V]
 576 
 577 该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。
 578 
 579 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
 580 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[91] at makeRDD at :21
 581  
 582 scala> rdd1.reduceByKeyLocally((x,y) => x + y)
 583 res90: scala.collection.Map[String,Int] = Map(B -> 3, A -> 2, C -> 1)
 584 
 585 29) cogroup和groupByKey的区别
 586 
 587     val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
 588     val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
 589     //cogroup
 590     val rdd3 = rdd1.cogroup(rdd2)
 591     //groupbykey
 592     val rdd4 = rdd1.union(rdd2).groupByKey
 593     //注意cogroup与groupByKey的区别
 594     rdd3.foreach(println)
 595     rdd4.foreach(println)
 596 30) join
 597 
 598 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
 599 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
 600 //求jion
 601 val rdd3 = rdd1.join(rdd2)
 602 rdd3.collect
 603 
 604 31) leftOuterJoin
 605 
 606 def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
 607 def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
 608 def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
 609 
 610 leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
 611 参数numPartitions用于指定结果的分区数
 612 参数partitioner用于指定分区函数
 613 
 614 var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
 615 var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
 616  
 617 scala> rdd1.leftOuterJoin(rdd2).collect
 618 res11: Array[(String, (String, Option[String]))] = Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some(c))))
 619  
 620 32) rightOuterJoin
 621 
 622 def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
 623 def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
 624 def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))] 
 625 
 626 rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
 627 参数numPartitions用于指定结果的分区数
 628 参数partitioner用于指定分区函数
 629 
 630 var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
 631 var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
 632 scala> rdd1.rightOuterJoin(rdd2).collect
 633 res12: Array[(String, (Option[String], String))] = Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c)))
 634  
 635 
 636 Action操作
 637 33) first
 638 
 639 def first(): T
 640 
 641 first返回RDD中的第一个元素,不排序。
 642 
 643 scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
 644 rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21
 645  
 646 scala> rdd1.first
 647 res14: (String, String) = (A,1)
 648  
 649 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
 650 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21
 651  
 652 scala> rdd1.first
 653 res8: Int = 10
 654  
 655 34) count
 656 
 657 def count(): Long
 658 
 659 count返回RDD中的元素数量。
 660 
 661 scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
 662 rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21
 663  
 664 scala> rdd1.count
 665 res15: Long = 3
 666  
 667 35) reduce
 668 
 669 def reduce(f: (T, T) ⇒ T): T
 670 
 671 根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。
 672 
 673 scala> var rdd1 = sc.makeRDD(1 to 10,2)
 674 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
 675  
 676 scala> rdd1.reduce(_ + _)
 677 res18: Int = 55
 678  
 679 scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
 680 rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21
 681  
 682 scala> rdd2.reduce((x,y) => {
 683      |       (x._1 + y._1,x._2 + y._2)
 684      |     })
 685 res21: (String, Int) = (CBBAA,6)
 686  
 687 collect
 688 
 689 def collect(): Array[T]
 690 
 691 collect用于将一个RDD转换成数组。
 692 
 693 scala> var rdd1 = sc.makeRDD(1 to 10,2)
 694 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
 695  
 696 scala> rdd1.collect
 697 res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
 698 36) take
 699 
 700 def take(num: Int): Array[T]
 701 
 702 take用于获取RDD中从0到num-1下标的元素,不排序。
 703 
 704 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
 705 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
 706  
 707 scala> rdd1.take(1)
 708 res0: Array[Int] = Array(10)                                                    
 709  
 710 scala> rdd1.take(2)
 711 res1: Array[Int] = Array(10, 4)
 712  
 713 37) top
 714 
 715 def top(num: Int)(implicit ord: Ordering[T]): Array[T]
 716 
 717 top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。
 718 
 719 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
 720 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
 721  
 722 scala> rdd1.top(1)
 723 res2: Array[Int] = Array(12)
 724  
 725 scala> rdd1.top(2)
 726 res3: Array[Int] = Array(12, 10)
 727  
 728 //指定排序规则
 729 scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
 730 myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499ef
 731  
 732 scala> rdd1.top(1)
 733 res4: Array[Int] = Array(2)
 734  
 735 scala> rdd1.top(2)
 736 res5: Array[Int] = Array(2, 3)
 737  
 738 38) takeOrdered
 739 
 740 def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
 741 
 742 takeOrdered和top类似,只不过以和top相反的顺序返回元素。
 743 
 744 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
 745 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
 746  
 747 scala> rdd1.top(1)
 748 res4: Array[Int] = Array(12)
 749  
 750 scala> rdd1.top(2)
 751 res5: Array[Int] = Array(12, 10)
 752  
 753 scala> rdd1.takeOrdered(1)
 754 res6: Array[Int] = Array(2)
 755  
 756 scala> rdd1.takeOrdered(2)
 757 res7: Array[Int] = Array(2, 3)
 758 
 759 39) aggregate
 760 
 761 def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
 762 
 763 aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。
 764 
 765 var rdd1 = sc.makeRDD(1 to 10,2)
 766 rdd1.mapPartitionsWithIndex{
 767         (partIdx,iter) => {
 768           var part_map = scala.collection.mutable.Map[String,List[Int]]()
 769             while(iter.hasNext){
 770               var part_name = "part_" + partIdx;
 771               var elem = iter.next()
 772               if(part_map.contains(part_name)) {
 773                 var elems = part_map(part_name)
 774                 elems ::= elem
 775                 part_map(part_name) = elems
 776               } else {
 777                 part_map(part_name) = List[Int]{elem}
 778               }
 779             }
 780             part_map.iterator
 781            
 782         }
 783       }.collect
 784 res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))
 785  
 786 ##第一个分区中包含5,4,3,2,1
 787 
 788 ##第二个分区中包含10,9,8,7,6
 789 
 790 scala> rdd1.aggregate(1)(
 791      |           {(x : Int,y : Int) => x + y}, 
 792      |           {(a : Int,b : Int) => a + b}
 793      |     )
 794 res17: Int = 58
 795  
 796 结果为什么是58,看下面的计算过程:
 797 
 798 ##先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1
 799 
 800 ##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16
 801 
 802 ## part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41
 803 
 804 ##再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1
 805 
 806 ##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58
 807 
 808 再比如:
 809 
 810 scala> rdd1.aggregate(2)(
 811      |           {(x : Int,y : Int) => x + y}, 
 812      |           {(a : Int,b : Int) => a * b}
 813      |     )
 814 res18: Int = 1428
 815  
 816 ##这次zeroValue=2
 817 
 818 ##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
 819 
 820 ##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
 821 
 822 ##最后:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428
 823 
 824 因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。
 825 
 826  
 827 
 828 40) fold
 829 
 830 def fold(zeroValue: T)(op: (T, T) ⇒ T): T
 831 
 832 fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。
 833 var rdd1 = sc.makeRDD(1 to 10, 2)
 834 scala> rdd1.fold(1)(
 835      |       (x,y) => x + y    
 836      |     )
 837 res19: Int = 58
 838  
 839 ##结果同上面使用aggregate的第一个例子一样,即:
 840 scala> rdd1.aggregate(1)(
 841      |           {(x,y) => x + y}, 
 842      |           {(a,b) => a + b}
 843      |     )
 844 res20: Int = 58
 845  
 846 41) lookup
 847 
 848 def lookup(key: K): Seq[V]
 849 
 850 lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。
 851 
 852  
 853 
 854 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
 855 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21
 856  
 857 scala> rdd1.lookup("A")
 858 res0: Seq[Int] = WrappedArray(0, 2)
 859  
 860 scala> rdd1.lookup("B")
 861 res1: Seq[Int] = WrappedArray(1, 2)
 862 42) countByKey
 863 
 864 def countByKey(): Map[K, Long]
 865 
 866 countByKey用于统计RDD[K,V]中每个K的数量。
 867 
 868 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
 869 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21
 870  
 871 scala> rdd1.countByKey
 872 res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3)
 873  
 874 43) foreach
 875 
 876 def foreach(f: (T) ⇒ Unit): Unit
 877 
 878 foreach用于遍历RDD,将函数f应用于每一个元素。
 879 但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。
 880 比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。
 881 我在Spark1.4中是这样,不知道是否真如此。
 882 这时候,使用accumulator共享变量与foreach结合,倒是个不错的选择。
 883 
 884 scala> var cnt = sc.accumulator(0)
 885 cnt: org.apache.spark.Accumulator[Int] = 0
 886  
 887 scala> var rdd1 = sc.makeRDD(1 to 10,2)
 888 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
 889  
 890 scala> rdd1.foreach(x => cnt += x)
 891  
 892 scala> cnt.value
 893 res51: Int = 55
 894  
 895 scala> rdd1.collect.foreach(println) 
 896 44) foreachPartition
 897 
 898 def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
 899 
 900 foreachPartition和foreach类似,只不过是对每一个分区使用f。
 901 
 902 scala> var rdd1 = sc.makeRDD(1 to 10,2)
 903 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
 904  
 905 scala> var allsize = sc.accumulator(0)
 906 size: org.apache.spark.Accumulator[Int] = 0
 907  
 908 
 909 scala>     rdd1.foreachPartition { x => {
 910      |       allsize += x.size
 911      |     }}
 912  
 913 scala> println(allsize.value)
 914 10
 915  
 916 45) sortBy
 917 
 918 def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
 919 
 920 sortBy根据给定的排序k函数将RDD中的元素进行排序。
 921 
 922 scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)
 923  
 924 scala> rdd1.sortBy(x => x).collect
 925 res1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //默认升序
 926  
 927 scala> rdd1.sortBy(x => x,false).collect
 928 res2: Array[Int] = Array(7, 6, 3, 2, 1, 0)  //降序
 929  
 930 //RDD[K,V]类型
 931 scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
 932  
 933 scala> rdd1.sortBy(x => x).collect
 934 res3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7))
 935  
 936 //按照V进行降序排序
 937 scala> rdd1.sortBy(x => x._2,false).collect
 938 res4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1))
 939 46) saveAsTextFile
 940 
 941 def saveAsTextFile(path: String): Unit
 942 
 943 def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
 944 
 945 saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。
 946 
 947 codec参数可以指定压缩的类名。
 948 
 949 var rdd1 = sc.makeRDD(1 to 10,2)
 950 scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFS
 951 hadoop fs -ls /tmp/lxw1234.com
 952 Found 2 items
 953 -rw-r--r--   2 lxw1234 supergroup        0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS
 954 -rw-r--r--   2 lxw1234 supergroup        21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000
 955  
 956 hadoop fs -cat /tmp/lxw1234.com/part-00000
 957 
 958 注意:如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)将文件保存到本地文件系统,那么只会保存在Executor所在机器的本地目录。
 959 
 960 //指定压缩格式保存
 961 
 962 rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/",classOf[com.hadoop.compression.lzo.LzopCodec])
 963  
 964 hadoop fs -ls /tmp/lxw1234.com
 965 -rw-r--r--   2 lxw1234 supergroup    0 2015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS
 966 -rw-r--r--   2 lxw1234 supergroup    71 2015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzo
 967  
 968 hadoop fs -text /tmp/lxw1234.com/part-00000.lzo
 969 
 970  
 971 47) saveAsSequenceFile
 972 
 973 saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。
 974 用法同saveAsTextFile。
 975 
 976 48) saveAsObjectFile
 977 
 978 def saveAsObjectFile(path: String): Unit
 979 
 980 saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。
 981 
 982 对于HDFS,默认采用SequenceFile保存。
 983 
 984 var rdd1 = sc.makeRDD(1 to 10,2)
 985 rdd1.saveAsObjectFile("hdfs://cdh5/tmp/lxw1234.com/")
 986  
 987 hadoop fs -cat /tmp/lxw1234.com/part-00000
 988 SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT
 989  
 990 49) saveAsHadoopFile
 991 
 992 def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit
 993 def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = …, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
 994 
 995 saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API。
 996 
 997 可以指定outputKeyClass、outputValueClass以及压缩格式。
 998 每个分区输出一个文件。
 999 var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
1000 import org.apache.hadoop.mapred.TextOutputFormat
1001 import org.apache.hadoop.io.Text
1002 import org.apache.hadoop.io.IntWritable
1003  
1004 rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
1005  
1006 rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],
1007                       classOf[com.hadoop.compression.lzo.LzopCodec])
1008  
1009 50) saveAsHadoopDataset
1010 
1011 def saveAsHadoopDataset(conf: JobConf): Unit
1012 saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中,比如HBase。
1013 在JobConf中,通常需要关注或者设置五个参数:
1014 文件的保存路径、key值的class类型、value值的class类型、RDD的输出格式(OutputFormat)、以及压缩相关的参数。
1015  
1016 ##使用saveAsHadoopDataset将RDD保存到HDFS中
1017 
1018 import org.apache.spark.SparkConf
1019 import org.apache.spark.SparkContext
1020 import SparkContext._
1021 import org.apache.hadoop.mapred.TextOutputFormat
1022 import org.apache.hadoop.io.Text
1023 import org.apache.hadoop.io.IntWritable
1024 import org.apache.hadoop.mapred.JobConf
1025  
1026  
1027  
1028 var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
1029 var jobConf = new JobConf()
1030 jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
1031 jobConf.setOutputKeyClass(classOf[Text])
1032 jobConf.setOutputValueClass(classOf[IntWritable])
1033 jobConf.set("mapred.output.dir","/tmp/lxw1234/")
1034 rdd1.saveAsHadoopDataset(jobConf)
1035  
1036 结果:
1037 hadoop fs -cat /tmp/lxw1234/part-00000
1038 A       2
1039 A       1
1040 hadoop fs -cat /tmp/lxw1234/part-00001
1041 B       6
1042 B       3
1043 B       7
1044  
1045 ##保存数据到HBASE
1046 
1047 HBase建表:
1048 
1049 create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
1050 
1051 import org.apache.spark.SparkConf
1052 import org.apache.spark.SparkContext
1053 import SparkContext._
1054 import org.apache.hadoop.mapred.TextOutputFormat
1055 import org.apache.hadoop.io.Text
1056 import org.apache.hadoop.io.IntWritable
1057 import org.apache.hadoop.mapred.JobConf
1058 import org.apache.hadoop.hbase.HBaseConfiguration
1059 import org.apache.hadoop.hbase.mapred.TableOutputFormat
1060 import org.apache.hadoop.hbase.client.Put
1061 import org.apache.hadoop.hbase.util.Bytes
1062 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
1063  
1064 var conf = HBaseConfiguration.create()
1065     var jobConf = new JobConf(conf)
1066     jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3")
1067     jobConf.set("zookeeper.znode.parent","/hbase")
1068     jobConf.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
1069     jobConf.setOutputFormat(classOf[TableOutputFormat])
1070     
1071     var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
1072     rdd1.map(x => 
1073       {
1074         var put = new Put(Bytes.toBytes(x._1))
1075         put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
1076         (new ImmutableBytesWritable,put)
1077       }
1078     ).saveAsHadoopDataset(jobConf)
1079  
1080 ##结果:
1081 hbase(main):005:0> scan ‘lxw1234‘
1082 ROW     COLUMN+CELL                                                                                                
1083  A       column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02                                              
1084  B       column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06                                              
1085  C       column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07                                              
1086 3 row(s) in 0.0550 seconds
1087  
1088 注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。
1089 
1090 51) saveAsNewAPIHadoopFile
1091 
1092 def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit
1093 
1094 def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit
1095 
1096  
1097 
1098 saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。
1099 
1100 用法基本同saveAsHadoopFile。
1101 
1102 import org.apache.spark.SparkConf
1103 import org.apache.spark.SparkContext
1104 import SparkContext._
1105 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
1106 import org.apache.hadoop.io.Text
1107 import org.apache.hadoop.io.IntWritable
1108  
1109 var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
1110 rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
1111  
1112 52) saveAsNewAPIHadoopDataset
1113 
1114 def saveAsNewAPIHadoopDataset(conf: Configuration): Unit
1115 
1116 作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。
1117 
1118 以写入HBase为例:
1119 
1120  
1121 
1122 HBase建表:
1123 
1124 create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
1125 
1126  
1127 
1128 完整的Spark应用程序:
1129 
1130 package com.lxw1234.test
1131  
1132 import org.apache.spark.SparkConf
1133 import org.apache.spark.SparkContext
1134 import SparkContext._
1135 import org.apache.hadoop.hbase.HBaseConfiguration
1136 import org.apache.hadoop.mapreduce.Job
1137 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
1138 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
1139 import org.apache.hadoop.hbase.client.Result
1140 import org.apache.hadoop.hbase.util.Bytes
1141 import org.apache.hadoop.hbase.client.Put
1142  
1143 object Test {
1144   def main(args : Array[String]) {
1145    val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com")
1146    val sc = new SparkContext(sparkConf);
1147    var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
1148    
1149     sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")
1150     sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")
1151     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
1152     var job = new Job(sc.hadoopConfiguration)
1153     job.setOutputKeyClass(classOf[ImmutableBytesWritable])
1154     job.setOutputValueClass(classOf[Result])
1155     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
1156     
1157     rdd1.map(
1158       x => {
1159         var put = new Put(Bytes.toBytes(x._1))
1160         put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
1161         (new ImmutableBytesWritable,put)
1162       }    
1163     ).saveAsNewAPIHadoopDataset(job.getConfiguration)
1164     
1165     sc.stop()   
1166   }
1167 }
1168  
1169 注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。
View Code
   1 1、RDD操作详解
   2 启动spark-shell
   3 spark-shell --master spark://hdp-node-01:7077 
   4 Spark core 核心数据抽象是RDD
   5 
   6 1.1 基本转换
   7 1) map
   8 map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
   9 举例:
  10 //设置spark的配置文件信息
  11 val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local")
  12 //构建sparkcontext上下文对象,它是程序的入口,所有计算的源头
  13 val sc: SparkContext = new SparkContext(sparkConf)
  14 //定义一个列表
  15 val list: List[Int] = List(1, 2, 3, 4, 5, 6, 7)
  16 //生成一个rdd
  17 val rdd: RDD[Int] = sc.parallelize(list)
  18 //map算子
  19 val map: RDD[Int] = rdd.map(x => x * 2)
  20 //foreach 算子打印
  21 map.foreach(x => println(x))
  22 sc.stop()
  23 上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。
  24 2) filter
  25 filter 是对RDD中的每个元素都执行一个指定的函数来过滤产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
  26 //设置spark的配置文件信息
  27 val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local")
  28 //构建sparkcontext上下文对象,它是程序的入口,所有计算的源头
  29 val sc: SparkContext = new SparkContext(sparkConf)
  30 sc.setLogLevel("WARN")
  31 //定义一个列表
  32 val list: List[Int] = List(1, 2, 3, 4, 5, 6, 7)
  33 //生成一个rdd
  34 val rdd: RDD[Int] = sc.parallelize(list)
  35 //filter算子
  36 val map: RDD[Int] = rdd.filter(_ > 5)
  37 //foreach 算子打印
  38 map.foreach(x => println(x))
  39 sc.stop()
  40  
  41 3) flatMap
  42 与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)
  43 scala> val a = sc.parallelize(1 to 4, 2)
  44 scala> val b = a.flatMap(x => 1 to x)
  45 scala> b.collect
  46 res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)
  47 
  48 4) mapPartitions
  49 mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 它的函数定义为:
  50 def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
  51 f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。
  52 举例:
  53 scala> val a = sc.parallelize(1 to 9, 3)
  54 scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
  55   var res = List[(T, T)]() 
  56   var pre = iter.next 
  57 while (iter.hasNext) {
  58     val cur = iter.next
  59     res.::=(pre, cur)
  60       pre = cur  } 
  61   res.iterator
  62 }
  63 scala> a.mapPartitions(myfunc).collect
  64 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
  65 上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。 mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。
  66 5) mapPartitionsWithIndex
  67 def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
  68 函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。
  69 
  70 var rdd1 = sc.makeRDD(1 to 5,2)
  71 //rdd1有两个分区
  72 var rdd2 = rdd1.mapPartitionsWithIndex{
  73         (x,iter) => {
  74           var result = List[String]()
  75             var i = 0
  76             while(iter.hasNext){
  77               i += iter.next()
  78             }
  79             result.::(x + "|" + i).iterator
  80            
  81         }
  82       }
  83 //rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引
  84 scala> rdd2.collect
  85 res13: Array[String] = Array(0|3, 1|12)
  86  
  87  
  88 6) coalesce
  89 
  90 def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
  91 该函数用于将RDD进行重分区,使用HashPartitioner。
  92 第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;
  93 以下面的例子来看:
  94 scala> var data = sc.parallelize(1 to 12, 3) 
  95 scala> data.collect 
  96 scala> data.partitions.size 
  97 scala> var rdd1 = data.coalesce(1) 
  98 scala> rdd1.partitions.size 
  99 scala> var rdd1 = data.coalesce(4) 
 100 scala> rdd1.partitions.size
 101 res2: Int = 1   //如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,//否则,分区数不便
 102 scala> var rdd1 = data.coalesce(4,true) 
 103 scala> rdd1.partitions.size
 104 res3: Int = 4
 105  
 106 7) repartition
 107 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
 108 该函数其实就是coalesce函数第二个参数为true的实现
 109 scala> var data = sc.parallelize(1 to 12, 3) 
 110 scala> data.collect 
 111 scala> data.partitions.size 
 112 scala> var rdd1 = data. repartition(1) 
 113 scala> rdd1.partitions.size 
 114 scala> var rdd1 = data. repartition(4) 
 115 scala> rdd1.partitions.size
 116 res3: Int = 4
 117 8) randomSplit
 118 
 119 def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
 120 该函数根据weights权重,将一个RDD切分成多个RDD。
 121 该权重参数为一个Double数组
 122 第二个参数为random的种子,基本可忽略。
 123 scala> var rdd = sc.makeRDD(1 to 12,12)
 124 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at :21
 125 
 126 scala> rdd.collect
 127 res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)  
 128  
 129 scala> var splitRDD = rdd.randomSplit(Array(0.5, 0.1, 0.2, 0.2))
 130 splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[17] at randomSplit at :23, 
 131 MapPartitionsRDD[18] at randomSplit at :23, 
 132 MapPartitionsRDD[19] at randomSplit at :23, 
 133 MapPartitionsRDD[20] at randomSplit at :23)
 134  
 135 //这里注意:randomSplit的结果是一个RDD数组
 136 scala> splitRDD.size
 137 res8: Int = 4
 138 //由于randomSplit的第一个参数weights中传入的值有4个,因此,就会切分成4个RDD,
 139 //把原来的rdd按照权重0.5, 0.1, 0.2, 0.2,随机划分到这4个RDD中,权重高的RDD,划分到//的几率就大一些。
 140 //注意,权重的总和加起来为1,否则会不正常 
 141 scala> splitRDD(0).collect
 142 res10: Array[Int] = Array(1, 4)
 143  
 144 scala> splitRDD(1).collect
 145 res11: Array[Int] = Array(3)                                                    
 146  
 147 scala> splitRDD(2).collect
 148 res12: Array[Int] = Array(5, 9)
 149  
 150 scala> splitRDD(3).collect
 151 res13: Array[Int] = Array(2, 6, 7, 8, 10)
 152  
 153 9) glom
 154 
 155 def glom(): RDD[Array[T]]
 156 该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。
 157 
 158 scala> var rdd = sc.makeRDD(1 to 10,3)
 159 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at makeRDD at :21
 160 scala> rdd.partitions.size
 161 res33: Int = 3  //该RDD有3个分区
 162 scala> rdd.glom().collect
 163 res35: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
 164 //glom将每个分区中的元素放到一个数组中,这样,结果就变成了3个数组
 165 
 166 10) union并集
 167 
 168 val rdd1 = sc.parallelize(List(5, 6, 4, 3))
 169 val rdd2 = sc.parallelize(List(1, 2, 3, 4))
 170 //求并集
 171 val rdd3 = rdd1.union(rdd2)
 172 rdd3.collect
 173 11) distinct
 174 去重
 175 val rdd1 = sc.parallelize(List(5, 6, 4, 3))
 176 val rdd2 = sc.parallelize(List(1, 2, 3, 4))
 177 //求并集
 178 val rdd3 = rdd1.union(rdd2)
 179 //去重输出
 180 rdd3.distinct.collect
 181 
 182 12) intersection交集
 183 val rdd1 = sc.parallelize(List(5, 6, 4, 3))
 184 val rdd2 = sc.parallelize(List(1, 2, 3, 4))
 185 //求交集
 186 val rdd4 = rdd1.intersection(rdd2) 
 187 rdd4.collect
 188 
 189 13) subtract
 190 def subtract(other: RDD[T]): RDD[T]
 191 def subtract(other: RDD[T], numPartitions: Int): RDD[T]
 192 def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
 193 该函数返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。
 194 
 195 val rdd1 = sc.parallelize(List(5, 6, 6, 4, 3))
 196     val rdd2 = sc.parallelize(List(1, 2, 3, 4))
 197     //求差集
 198     val rdd4 = rdd1.subtract(rdd2)
 199 rdd4.collect
 200 14) subtractByKey
 201 
 202 def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]
 203 def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]
 204 def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]
 205 
 206 subtractByKey和基本转换操作中的subtract类似,只不过这里是针对K的,返回在主RDD中出现,并且不在otherRDD中出现的元素。
 207 参数numPartitions用于指定结果的分区数
 208 参数partitioner用于指定分区函数
 209 
 210 var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
 211 var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) 
 212 scala> rdd1.subtractByKey(rdd2).collect
 213 res13: Array[(String, String)] = Array((B,2))
 214 
 215 15) groupbyKey
 216 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
 217     val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
 218     //求并集
 219     val rdd4 = rdd1 union rdd2
 220     //按key进行分组
 221     val rdd5 = rdd4.groupByKey
 222 rdd5.collect
 223 16) reduceByKey
 224 顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。
 225 举例:
 226 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
 227     val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
 228     //求并集
 229     val rdd4 = rdd1 union rdd2
 230     //按key进行分组
 231     val rdd6 = rdd4.reduceByKey(_ + _)
 232     rdd6.collect()
 233 
 234 17) sortByKey
 235 将List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1))和List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))做wordcount,并按名称排序
 236 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
 237     val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
 238     val rdd3 = rdd1.union(rdd2)
 239     //按key进行聚合
 240     val rdd4 = rdd3.reduceByKey(_ + _)
 241     //false降序
 242     val rdd5 = rdd4.sortByKey(false)
 243 rdd5.collect
 244 18) sortBy
 245 将List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1))和List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))做wordcount,并按数值排序
 246 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
 247     val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
 248     val rdd3 = rdd1.union(rdd2)
 249     //按key进行聚合
 250     val rdd4 = rdd3.reduceByKey(_ + _)
 251     //false降序
 252     val rdd5 = rdd4.sortBy(_._2, false)
 253     rdd5.collect
 254 
 255 19) zip
 256 def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
 257 
 258 zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
 259 
 260 scala> var rdd1 = sc.makeRDD(1 to 5,2)
 261 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21
 262  
 263 scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
 264 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21
 265  
 266 scala> rdd1.zip(rdd2).collect
 267 res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))           
 268  
 269 scala> rdd2.zip(rdd1).collect
 270 res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))
 271  
 272 scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3)
 273 rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21
 274 scala> rdd1.zip(rdd3).collect
 275 java.lang.IllegalArgumentException: Can‘t zip RDDs with unequal numbers of partitions
 276 //如果两个RDD分区数不同,则抛出异常
 277  
 278 20) zipPartitions
 279 
 280 zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。
 281 该函数有好几种实现,可分为三类:
 282 
 283 参数是一个RDD
 284 def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]
 285 
 286 def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]
 287 
 288 这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息
 289 
 290 映射方法f参数为两个RDD的迭代器。
 291 
 292 scala> var rdd1 = sc.makeRDD(1 to 5,2)
 293 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21
 294  
 295 scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
 296 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21
 297  
 298 //rdd1两个分区中元素分布:
 299 scala> rdd1.mapPartitionsWithIndex{
 300      |         (x,iter) => {
 301      |           var result = List[String]()
 302      |             while(iter.hasNext){
 303      |               result ::= ("part_" + x + "|" + iter.next())
 304      |             }
 305      |             result.iterator
 306      |            
 307      |         }
 308      |       }.collect
 309 res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)
 310  
 311 //rdd2两个分区中元素分布
 312 scala> rdd2.mapPartitionsWithIndex{
 313      |         (x,iter) => {
 314      |           var result = List[String]()
 315      |             while(iter.hasNext){
 316      |               result ::= ("part_" + x + "|" + iter.next())
 317      |             }
 318      |             result.iterator
 319      |            
 320      |         }
 321      |       }.collect
 322 res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)
 323  
 324 //rdd1和rdd2做zipPartition
 325 scala> rdd1.zipPartitions(rdd2){
 326      |       (rdd1Iter,rdd2Iter) => {
 327      |         var result = List[String]()
 328      |         while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
 329      |           result::=(rdd1Iter.next() + "_" + rdd2Iter.next())
 330      |         }
 331      |         result.iterator
 332      |       }
 333      |     }.collect
 334 res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)
 335  
 336  
 337 参数是两个RDD
 338 def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]
 339 
 340 def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]
 341 
 342 用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。
 343 
 344 scala> var rdd1 = sc.makeRDD(1 to 5,2)
 345 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21
 346  
 347 scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
 348 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21
 349  
 350 scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2)
 351 rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21
 352  
 353 //rdd3中个分区元素分布
 354 scala> rdd3.mapPartitionsWithIndex{
 355      |         (x,iter) => {
 356      |           var result = List[String]()
 357      |             while(iter.hasNext){
 358      |               result ::= ("part_" + x + "|" + iter.next())
 359      |             }
 360      |             result.iterator
 361      |            
 362      |         }
 363      |       }.collect
 364 res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)
 365  
 366 //三个RDD做zipPartitions
 367 scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){
 368      |       (rdd1Iter,rdd2Iter,rdd3Iter) => {
 369      |         var result = List[String]()
 370      |         while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {
 371      |           result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())
 372      |         }
 373      |         result.iterator
 374      |       }
 375      |     }
 376 rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27
 377  
 378 scala> rdd4.collect
 379 res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)
 380  
 381 参数是三个RDD
 382 def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]
 383 
 384 def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]
 385 
 386 用法同上面,只不过这里又多了个一个RDD而已。
 387 
 388 
 389 21) zipWithIndex
 390 
 391 def zipWithIndex(): RDD[(T, Long)]
 392 该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
 393 scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
 394 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21
 395 scala> rdd2.zipWithIndex().collect
 396 res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))
 397  
 398 22) zipWithUniqueId
 399 
 400 def zipWithUniqueId(): RDD[(T, Long)]
 401 该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:
 402 每个分区中第一个元素的唯一ID值为:该分区索引号,
 403 每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)
 404 看下面的例子:
 405 scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
 406 rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21
 407 //rdd1有两个分区,
 408 scala> rdd1.zipWithUniqueId().collect
 409 res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
 410 //总分区数为2
 411 //第一个分区第一个元素ID为0,第二个分区第一个元素ID为1
 412 //第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4
 413 //第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5
 414 
 415 键值转换
 416 23) partitionBy
 417 
 418 def partitionBy(partitioner: Partitioner): RDD[(K, V)]
 419 该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。
 420 scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
 421 rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21
 422 scala> rdd1.partitions.size
 423 res20: Int = 2
 424  
 425 //查看rdd1中每个分区的元素
 426 scala> rdd1.mapPartitionsWithIndex{
 427      |         (partIdx,iter) => {
 428      |           var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
 429      |             while(iter.hasNext){
 430      |               var part_name = "part_" + partIdx;
 431      |               var elem = iter.next()
 432      |               if(part_map.contains(part_name)) {
 433      |                 var elems = part_map(part_name)
 434      |                 elems ::= elem
 435      |                 part_map(part_name) = elems
 436      |               } else {
 437      |                 part_map(part_name) = List[(Int,String)]{elem}
 438      |               }
 439      |             }
 440      |             part_map.iterator
 441      |            
 442      |         }
 443      |       }.collect
 444 res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))
 445 //(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中
 446  
 447 //使用partitionBy重分区
 448 scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
 449 rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23
 450  
 451 scala> rdd2.partitions.size
 452 res23: Int = 2
 453  
 454 //查看rdd2中每个分区的元素
 455 scala> rdd2.mapPartitionsWithIndex{
 456      |         (partIdx,iter) => {
 457      |           var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
 458      |             while(iter.hasNext){
 459      |               var part_name = "part_" + partIdx;
 460      |               var elem = iter.next()
 461      |               if(part_map.contains(part_name)) {
 462      |                 var elems = part_map(part_name)
 463      |                 elems ::= elem
 464      |                 part_map(part_name) = elems
 465      |               } else {
 466      |                 part_map(part_name) = List[(Int,String)]{elem}
 467      |               }
 468      |             }
 469      |             part_map.iterator
 470      |         }
 471      |       }.collect
 472 res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))
 473 //(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中
 474 24) mapValues
 475 mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。
 476 举例:
 477 scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
 478 scala> val b = a.map(x => (x.length, x))
 479 scala> b.mapValues("x" + _ + "x").collect
 480 res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))
 481 
 482 25) flatMapValues
 483 flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。
 484 举例
 485 val a = sc.parallelize(List((1, 2), (3, 4), (5, 6)))
 486     val b = a.flatMapValues(x => 1.to(x))
 487     b.collect.foreach(println)
 488 26) combineByKey
 489 
 490 def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
 491 def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
 492 def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
 493 该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。
 494 其中的参数:
 495 createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C ,分区内相同的key做一次
 496 mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C,分区内相同的key循环做
 497 mergeCombiners:分区合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C,分区之间循环做
 498 numPartitions:结果RDD分区数,默认保持原有的分区数
 499 partitioner:分区函数,默认为HashPartitioner
 500 mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true
 501 
 502 看下面例子:
 503 scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
 504 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21 
 505 scala> rdd1.combineByKey(
 506      |       (v : Int) => v + "_",   
 507      |       (c : String, v : Int) => c + "@" + v,  
 508      |       (c1 : String, c2 : String) => c1 + "$" + c2
 509      |     ).collect
 510 res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))
 511 
 512 其中三个映射函数分别为:
 513 createCombiner: (V) => C
 514 (v : Int) => v + “_” //在每一个V值后面加上字符_,返回C类型(String)
 515 mergeValue: (C, V) => C
 516 (c : String, v : Int) => c + “@” + v //合并C类型和V类型,中间加字符@,返回C(String)
 517 mergeCombiners: (C, C) => C
 518 (c1 : String, c2 : String) => c1 + “$” + c2 //合并C类型和C类型,中间加$,返回C(String)
 519 其他参数为默认值。
 520 最终,将RDD[String,Int]转换为RDD[String,String]。
 521 
 522 再看例子:
 523 
 524 rdd1.combineByKey(
 525       (v : Int) => List(v),
 526       (c : List[Int], v : Int) => v :: c,
 527       (c1 : List[Int], c2 : List[Int]) => c1 ::: c2
 528 ).collect
 529 res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))
 530 最终将RDD[String,Int]转换为RDD[String,List[Int]]。
 531 
 532 27) foldByKey
 533 
 534 def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
 535 
 536 def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
 537 
 538 def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
 539 
 540  
 541 
 542 该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V.
 543 
 544 例子:
 545 
 546 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
 547 scala> rdd1.foldByKey(0)(_+_).collect
 548 res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1)) 
 549 //将rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操
 550 //作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即:
 551 //("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2)
 552  
 553 再看:
 554 
 555 scala> rdd1.foldByKey(2)(_+_).collect
 556 res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))
 557 //先将zeroValue=2应用于每个V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函
 558 //数应用于初始化后的V,最后得到:(A,2+4),即:(A,6)
 559  
 560 再看乘法操作:
 561 
 562 scala> rdd1.foldByKey(0)(_*_).collect
 563 res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))
 564 //先将zeroValue=0应用于每个V,注意,这次映射函数为乘法,得到:("A",0*0), ("A",2*0),
 565 //即:("A",0), ("A",0),再将映射函//数应用于初始化后的V,最后得到:(A,0*0),即:(A,0)
 566 //其他K也一样,最终都得到了V=0
 567  
 568 scala> rdd1.foldByKey(1)(_*_).collect
 569 res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))
 570 //映射函数为乘法时,需要将zeroValue设为1,才能得到我们想要的结果。
 571  
 572 在使用foldByKey算子时候,要特别注意映射函数及zeroValue的取值。
 573 28) reduceByKeyLocally
 574 
 575 def reduceByKeyLocally(func: (V, V) => V): Map[K, V]
 576 
 577 该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。
 578 
 579 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
 580 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[91] at makeRDD at :21
 581  
 582 scala> rdd1.reduceByKeyLocally((x,y) => x + y)
 583 res90: scala.collection.Map[String,Int] = Map(B -> 3, A -> 2, C -> 1)
 584 
 585 29) cogroup和groupByKey的区别
 586 
 587     val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
 588     val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
 589     //cogroup
 590     val rdd3 = rdd1.cogroup(rdd2)
 591     //groupbykey
 592     val rdd4 = rdd1.union(rdd2).groupByKey
 593     //注意cogroup与groupByKey的区别
 594     rdd3.foreach(println)
 595     rdd4.foreach(println)
 596 30) join
 597 
 598 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
 599 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
 600 //求jion
 601 val rdd3 = rdd1.join(rdd2)
 602 rdd3.collect
 603 
 604 31) leftOuterJoin
 605 
 606 def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
 607 def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
 608 def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
 609 
 610 leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
 611 参数numPartitions用于指定结果的分区数
 612 参数partitioner用于指定分区函数
 613 
 614 var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
 615 var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
 616  
 617 scala> rdd1.leftOuterJoin(rdd2).collect
 618 res11: Array[(String, (String, Option[String]))] = Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some(c))))
 619  
 620 32) rightOuterJoin
 621 
 622 def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
 623 def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
 624 def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))] 
 625 
 626 rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
 627 参数numPartitions用于指定结果的分区数
 628 参数partitioner用于指定分区函数
 629 
 630 var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
 631 var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
 632 scala> rdd1.rightOuterJoin(rdd2).collect
 633 res12: Array[(String, (Option[String], String))] = Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c)))
 634  
 635 
 636 Action操作
 637 33) first
 638 
 639 def first(): T
 640 
 641 first返回RDD中的第一个元素,不排序。
 642 
 643 scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
 644 rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21
 645  
 646 scala> rdd1.first
 647 res14: (String, String) = (A,1)
 648  
 649 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
 650 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21
 651  
 652 scala> rdd1.first
 653 res8: Int = 10
 654  
 655 34) count
 656 
 657 def count(): Long
 658 
 659 count返回RDD中的元素数量。
 660 
 661 scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
 662 rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21
 663  
 664 scala> rdd1.count
 665 res15: Long = 3
 666  
 667 35) reduce
 668 
 669 def reduce(f: (T, T) ⇒ T): T
 670 
 671 根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。
 672 
 673 scala> var rdd1 = sc.makeRDD(1 to 10,2)
 674 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
 675  
 676 scala> rdd1.reduce(_ + _)
 677 res18: Int = 55
 678  
 679 scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
 680 rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21
 681  
 682 scala> rdd2.reduce((x,y) => {
 683      |       (x._1 + y._1,x._2 + y._2)
 684      |     })
 685 res21: (String, Int) = (CBBAA,6)
 686  
 687 collect
 688 
 689 def collect(): Array[T]
 690 
 691 collect用于将一个RDD转换成数组。
 692 
 693 scala> var rdd1 = sc.makeRDD(1 to 10,2)
 694 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
 695  
 696 scala> rdd1.collect
 697 res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
 698 36) take
 699 
 700 def take(num: Int): Array[T]
 701 
 702 take用于获取RDD中从0到num-1下标的元素,不排序。
 703 
 704 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
 705 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
 706  
 707 scala> rdd1.take(1)
 708 res0: Array[Int] = Array(10)                                                    
 709  
 710 scala> rdd1.take(2)
 711 res1: Array[Int] = Array(10, 4)
 712  
 713 37) top
 714 
 715 def top(num: Int)(implicit ord: Ordering[T]): Array[T]
 716 
 717 top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。
 718 
 719 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
 720 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
 721  
 722 scala> rdd1.top(1)
 723 res2: Array[Int] = Array(12)
 724  
 725 scala> rdd1.top(2)
 726 res3: Array[Int] = Array(12, 10)
 727  
 728 //指定排序规则
 729 scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
 730 myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499ef
 731  
 732 scala> rdd1.top(1)
 733 res4: Array[Int] = Array(2)
 734  
 735 scala> rdd1.top(2)
 736 res5: Array[Int] = Array(2, 3)
 737  
 738 38) takeOrdered
 739 
 740 def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
 741 
 742 takeOrdered和top类似,只不过以和top相反的顺序返回元素。
 743 
 744 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
 745 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
 746  
 747 scala> rdd1.top(1)
 748 res4: Array[Int] = Array(12)
 749  
 750 scala> rdd1.top(2)
 751 res5: Array[Int] = Array(12, 10)
 752  
 753 scala> rdd1.takeOrdered(1)
 754 res6: Array[Int] = Array(2)
 755  
 756 scala> rdd1.takeOrdered(2)
 757 res7: Array[Int] = Array(2, 3)
 758 
 759 39) aggregate
 760 
 761 def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
 762 
 763 aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。
 764 
 765 var rdd1 = sc.makeRDD(1 to 10,2)
 766 rdd1.mapPartitionsWithIndex{
 767         (partIdx,iter) => {
 768           var part_map = scala.collection.mutable.Map[String,List[Int]]()
 769             while(iter.hasNext){
 770               var part_name = "part_" + partIdx;
 771               var elem = iter.next()
 772               if(part_map.contains(part_name)) {
 773                 var elems = part_map(part_name)
 774                 elems ::= elem
 775                 part_map(part_name) = elems
 776               } else {
 777                 part_map(part_name) = List[Int]{elem}
 778               }
 779             }
 780             part_map.iterator
 781            
 782         }
 783       }.collect
 784 res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))
 785  
 786 ##第一个分区中包含5,4,3,2,1
 787 
 788 ##第二个分区中包含10,9,8,7,6
 789 
 790 scala> rdd1.aggregate(1)(
 791      |           {(x : Int,y : Int) => x + y}, 
 792      |           {(a : Int,b : Int) => a + b}
 793      |     )
 794 res17: Int = 58
 795  
 796 结果为什么是58,看下面的计算过程:
 797 
 798 ##先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1
 799 
 800 ##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16
 801 
 802 ## part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41
 803 
 804 ##再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1
 805 
 806 ##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58
 807 
 808 再比如:
 809 
 810 scala> rdd1.aggregate(2)(
 811      |           {(x : Int,y : Int) => x + y}, 
 812      |           {(a : Int,b : Int) => a * b}
 813      |     )
 814 res18: Int = 1428
 815  
 816 ##这次zeroValue=2
 817 
 818 ##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
 819 
 820 ##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
 821 
 822 ##最后:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428
 823 
 824 因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。
 825 
 826  
 827 
 828 40) fold
 829 
 830 def fold(zeroValue: T)(op: (T, T) ⇒ T): T
 831 
 832 fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。
 833 var rdd1 = sc.makeRDD(1 to 10, 2)
 834 scala> rdd1.fold(1)(
 835      |       (x,y) => x + y    
 836      |     )
 837 res19: Int = 58
 838  
 839 ##结果同上面使用aggregate的第一个例子一样,即:
 840 scala> rdd1.aggregate(1)(
 841      |           {(x,y) => x + y}, 
 842      |           {(a,b) => a + b}
 843      |     )
 844 res20: Int = 58
 845  
 846 41) lookup
 847 
 848 def lookup(key: K): Seq[V]
 849 
 850 lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。
 851 
 852  
 853 
 854 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
 855 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21
 856  
 857 scala> rdd1.lookup("A")
 858 res0: Seq[Int] = WrappedArray(0, 2)
 859  
 860 scala> rdd1.lookup("B")
 861 res1: Seq[Int] = WrappedArray(1, 2)
 862 42) countByKey
 863 
 864 def countByKey(): Map[K, Long]
 865 
 866 countByKey用于统计RDD[K,V]中每个K的数量。
 867 
 868 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
 869 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21
 870  
 871 scala> rdd1.countByKey
 872 res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3)
 873  
 874 43) foreach
 875 
 876 def foreach(f: (T) ⇒ Unit): Unit
 877 
 878 foreach用于遍历RDD,将函数f应用于每一个元素。
 879 但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。
 880 比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。
 881 我在Spark1.4中是这样,不知道是否真如此。
 882 这时候,使用accumulator共享变量与foreach结合,倒是个不错的选择。
 883 
 884 scala> var cnt = sc.accumulator(0)
 885 cnt: org.apache.spark.Accumulator[Int] = 0
 886  
 887 scala> var rdd1 = sc.makeRDD(1 to 10,2)
 888 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
 889  
 890 scala> rdd1.foreach(x => cnt += x)
 891  
 892 scala> cnt.value
 893 res51: Int = 55
 894  
 895 scala> rdd1.collect.foreach(println) 
 896 44) foreachPartition
 897 
 898 def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
 899 
 900 foreachPartition和foreach类似,只不过是对每一个分区使用f。
 901 
 902 scala> var rdd1 = sc.makeRDD(1 to 10,2)
 903 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
 904  
 905 scala> var allsize = sc.accumulator(0)
 906 size: org.apache.spark.Accumulator[Int] = 0
 907  
 908 
 909 scala>     rdd1.foreachPartition { x => {
 910      |       allsize += x.size
 911      |     }}
 912  
 913 scala> println(allsize.value)
 914 10
 915  
 916 45) sortBy
 917 
 918 def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
 919 
 920 sortBy根据给定的排序k函数将RDD中的元素进行排序。
 921 
 922 scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)
 923  
 924 scala> rdd1.sortBy(x => x).collect
 925 res1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //默认升序
 926  
 927 scala> rdd1.sortBy(x => x,false).collect
 928 res2: Array[Int] = Array(7, 6, 3, 2, 1, 0)  //降序
 929  
 930 //RDD[K,V]类型
 931 scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
 932  
 933 scala> rdd1.sortBy(x => x).collect
 934 res3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7))
 935  
 936 //按照V进行降序排序
 937 scala> rdd1.sortBy(x => x._2,false).collect
 938 res4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1))
 939 46) saveAsTextFile
 940 
 941 def saveAsTextFile(path: String): Unit
 942 
 943 def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
 944 
 945 saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。
 946 
 947 codec参数可以指定压缩的类名。
 948 
 949 var rdd1 = sc.makeRDD(1 to 10,2)
 950 scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFS
 951 hadoop fs -ls /tmp/lxw1234.com
 952 Found 2 items
 953 -rw-r--r--   2 lxw1234 supergroup        0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS
 954 -rw-r--r--   2 lxw1234 supergroup        21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000
 955  
 956 hadoop fs -cat /tmp/lxw1234.com/part-00000
 957 
 958 注意:如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)将文件保存到本地文件系统,那么只会保存在Executor所在机器的本地目录。
 959 
 960 //指定压缩格式保存
 961 
 962 rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/",classOf[com.hadoop.compression.lzo.LzopCodec])
 963  
 964 hadoop fs -ls /tmp/lxw1234.com
 965 -rw-r--r--   2 lxw1234 supergroup    0 2015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS
 966 -rw-r--r--   2 lxw1234 supergroup    71 2015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzo
 967  
 968 hadoop fs -text /tmp/lxw1234.com/part-00000.lzo
 969 
 970  
 971 47) saveAsSequenceFile
 972 
 973 saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。
 974 用法同saveAsTextFile。
 975 
 976 48) saveAsObjectFile
 977 
 978 def saveAsObjectFile(path: String): Unit
 979 
 980 saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。
 981 
 982 对于HDFS,默认采用SequenceFile保存。
 983 
 984 var rdd1 = sc.makeRDD(1 to 10,2)
 985 rdd1.saveAsObjectFile("hdfs://cdh5/tmp/lxw1234.com/")
 986  
 987 hadoop fs -cat /tmp/lxw1234.com/part-00000
 988 SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT
 989  
 990 49) saveAsHadoopFile
 991 
 992 def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit
 993 def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = …, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
 994 
 995 saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API。
 996 
 997 可以指定outputKeyClass、outputValueClass以及压缩格式。
 998 每个分区输出一个文件。
 999 var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
1000 import org.apache.hadoop.mapred.TextOutputFormat
1001 import org.apache.hadoop.io.Text
1002 import org.apache.hadoop.io.IntWritable
1003  
1004 rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
1005  
1006 rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],
1007                       classOf[com.hadoop.compression.lzo.LzopCodec])
1008  
1009 50) saveAsHadoopDataset
1010 
1011 def saveAsHadoopDataset(conf: JobConf): Unit
1012 saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中,比如HBase。
1013 在JobConf中,通常需要关注或者设置五个参数:
1014 文件的保存路径、key值的class类型、value值的class类型、RDD的输出格式(OutputFormat)、以及压缩相关的参数。
1015  
1016 ##使用saveAsHadoopDataset将RDD保存到HDFS中
1017 
1018 import org.apache.spark.SparkConf
1019 import org.apache.spark.SparkContext
1020 import SparkContext._
1021 import org.apache.hadoop.mapred.TextOutputFormat
1022 import org.apache.hadoop.io.Text
1023 import org.apache.hadoop.io.IntWritable
1024 import org.apache.hadoop.mapred.JobConf
1025  
1026  
1027  
1028 var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
1029 var jobConf = new JobConf()
1030 jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
1031 jobConf.setOutputKeyClass(classOf[Text])
1032 jobConf.setOutputValueClass(classOf[IntWritable])
1033 jobConf.set("mapred.output.dir","/tmp/lxw1234/")
1034 rdd1.saveAsHadoopDataset(jobConf)
1035  
1036 结果:
1037 hadoop fs -cat /tmp/lxw1234/part-00000
1038 A       2
1039 A       1
1040 hadoop fs -cat /tmp/lxw1234/part-00001
1041 B       6
1042 B       3
1043 B       7
1044  
1045 ##保存数据到HBASE
1046 
1047 HBase建表:
1048 
1049 create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
1050 
1051 import org.apache.spark.SparkConf
1052 import org.apache.spark.SparkContext
1053 import SparkContext._
1054 import org.apache.hadoop.mapred.TextOutputFormat
1055 import org.apache.hadoop.io.Text
1056 import org.apache.hadoop.io.IntWritable
1057 import org.apache.hadoop.mapred.JobConf
1058 import org.apache.hadoop.hbase.HBaseConfiguration
1059 import org.apache.hadoop.hbase.mapred.TableOutputFormat
1060 import org.apache.hadoop.hbase.client.Put
1061 import org.apache.hadoop.hbase.util.Bytes
1062 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
1063  
1064 var conf = HBaseConfiguration.create()
1065     var jobConf = new JobConf(conf)
1066     jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3")
1067     jobConf.set("zookeeper.znode.parent","/hbase")
1068     jobConf.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
1069     jobConf.setOutputFormat(classOf[TableOutputFormat])
1070     
1071     var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
1072     rdd1.map(x => 
1073       {
1074         var put = new Put(Bytes.toBytes(x._1))
1075         put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
1076         (new ImmutableBytesWritable,put)
1077       }
1078     ).saveAsHadoopDataset(jobConf)
1079  
1080 ##结果:
1081 hbase(main):005:0> scan ‘lxw1234‘
1082 ROW     COLUMN+CELL                                                                                                
1083  A       column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02                                              
1084  B       column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06                                              
1085  C       column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07                                              
1086 3 row(s) in 0.0550 seconds
1087  
1088 注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。
1089 
1090 51) saveAsNewAPIHadoopFile
1091 
1092 def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit
1093 
1094 def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit
1095 
1096  
1097 
1098 saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。
1099 
1100 用法基本同saveAsHadoopFile。
1101 
1102 import org.apache.spark.SparkConf
1103 import org.apache.spark.SparkContext
1104 import SparkContext._
1105 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
1106 import org.apache.hadoop.io.Text
1107 import org.apache.hadoop.io.IntWritable
1108  
1109 var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
1110 rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
1111  
1112 52) saveAsNewAPIHadoopDataset
1113 
1114 def saveAsNewAPIHadoopDataset(conf: Configuration): Unit
1115 
1116 作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。
1117 
1118 以写入HBase为例:
1119 
1120  
1121 
1122 HBase建表:
1123 
1124 create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
1125 
1126  
1127 
1128 完整的Spark应用程序:
1129 
1130 package com.lxw1234.test
1131  
1132 import org.apache.spark.SparkConf
1133 import org.apache.spark.SparkContext
1134 import SparkContext._
1135 import org.apache.hadoop.hbase.HBaseConfiguration
1136 import org.apache.hadoop.mapreduce.Job
1137 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
1138 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
1139 import org.apache.hadoop.hbase.client.Result
1140 import org.apache.hadoop.hbase.util.Bytes
1141 import org.apache.hadoop.hbase.client.Put
1142  
1143 object Test {
1144   def main(args : Array[String]) {
1145    val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com")
1146    val sc = new SparkContext(sparkConf);
1147    var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
1148    
1149     sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")
1150     sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")
1151     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
1152     var job = new Job(sc.hadoopConfiguration)
1153     job.setOutputKeyClass(classOf[ImmutableBytesWritable])
1154     job.setOutputValueClass(classOf[Result])
1155     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
1156     
1157     rdd1.map(
1158       x => {
1159         var put = new Put(Bytes.toBytes(x._1))
1160         put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
1161         (new ImmutableBytesWritable,put)
1162       }    
1163     ).saveAsNewAPIHadoopDataset(job.getConfiguration)
1164     
1165     sc.stop()   
1166   }
1167 }
1168  
1169 注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。

 

SparkRDD函数详解

原文:https://www.cnblogs.com/xjqi/p/12776841.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!