1 1、RDD操作详解 2 启动spark-shell 3 spark-shell --master spark://hdp-node-01:7077 4 Spark core 核心数据抽象是RDD 5 6 1.1 基本转换 7 1) map 8 map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 9 举例: 10 //设置spark的配置文件信息 11 val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local") 12 //构建sparkcontext上下文对象,它是程序的入口,所有计算的源头 13 val sc: SparkContext = new SparkContext(sparkConf) 14 //定义一个列表 15 val list: List[Int] = List(1, 2, 3, 4, 5, 6, 7) 16 //生成一个rdd 17 val rdd: RDD[Int] = sc.parallelize(list) 18 //map算子 19 val map: RDD[Int] = rdd.map(x => x * 2) 20 //foreach 算子打印 21 map.foreach(x => println(x)) 22 sc.stop() 23 上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。 24 2) filter 25 filter 是对RDD中的每个元素都执行一个指定的函数来过滤产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 26 //设置spark的配置文件信息 27 val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local") 28 //构建sparkcontext上下文对象,它是程序的入口,所有计算的源头 29 val sc: SparkContext = new SparkContext(sparkConf) 30 sc.setLogLevel("WARN") 31 //定义一个列表 32 val list: List[Int] = List(1, 2, 3, 4, 5, 6, 7) 33 //生成一个rdd 34 val rdd: RDD[Int] = sc.parallelize(list) 35 //filter算子 36 val map: RDD[Int] = rdd.filter(_ > 5) 37 //foreach 算子打印 38 map.foreach(x => println(x)) 39 sc.stop() 40 41 3) flatMap 42 与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值) 43 scala> val a = sc.parallelize(1 to 4, 2) 44 scala> val b = a.flatMap(x => 1 to x) 45 scala> b.collect 46 res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4) 47 48 4) mapPartitions 49 mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 它的函数定义为: 50 def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 51 f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。 52 举例: 53 scala> val a = sc.parallelize(1 to 9, 3) 54 scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { 55 var res = List[(T, T)]() 56 var pre = iter.next 57 while (iter.hasNext) { 58 val cur = iter.next 59 res.::=(pre, cur) 60 pre = cur } 61 res.iterator 62 } 63 scala> a.mapPartitions(myfunc).collect 64 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 65 上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。 mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。 66 5) mapPartitionsWithIndex 67 def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U] 68 函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。 69 70 var rdd1 = sc.makeRDD(1 to 5,2) 71 //rdd1有两个分区 72 var rdd2 = rdd1.mapPartitionsWithIndex{ 73 (x,iter) => { 74 var result = List[String]() 75 var i = 0 76 while(iter.hasNext){ 77 i += iter.next() 78 } 79 result.::(x + "|" + i).iterator 80 81 } 82 } 83 //rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引 84 scala> rdd2.collect 85 res13: Array[String] = Array(0|3, 1|12) 86 87 88 6) coalesce 89 90 def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T] 91 该函数用于将RDD进行重分区,使用HashPartitioner。 92 第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false; 93 以下面的例子来看: 94 scala> var data = sc.parallelize(1 to 12, 3) 95 scala> data.collect 96 scala> data.partitions.size 97 scala> var rdd1 = data.coalesce(1) 98 scala> rdd1.partitions.size 99 scala> var rdd1 = data.coalesce(4) 100 scala> rdd1.partitions.size 101 res2: Int = 1 //如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,//否则,分区数不便 102 scala> var rdd1 = data.coalesce(4,true) 103 scala> rdd1.partitions.size 104 res3: Int = 4 105 106 7) repartition 107 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 108 该函数其实就是coalesce函数第二个参数为true的实现 109 scala> var data = sc.parallelize(1 to 12, 3) 110 scala> data.collect 111 scala> data.partitions.size 112 scala> var rdd1 = data. repartition(1) 113 scala> rdd1.partitions.size 114 scala> var rdd1 = data. repartition(4) 115 scala> rdd1.partitions.size 116 res3: Int = 4 117 8) randomSplit 118 119 def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] 120 该函数根据weights权重,将一个RDD切分成多个RDD。 121 该权重参数为一个Double数组 122 第二个参数为random的种子,基本可忽略。 123 scala> var rdd = sc.makeRDD(1 to 12,12) 124 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at :21 125 126 scala> rdd.collect 127 res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 128 129 scala> var splitRDD = rdd.randomSplit(Array(0.5, 0.1, 0.2, 0.2)) 130 splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[17] at randomSplit at :23, 131 MapPartitionsRDD[18] at randomSplit at :23, 132 MapPartitionsRDD[19] at randomSplit at :23, 133 MapPartitionsRDD[20] at randomSplit at :23) 134 135 //这里注意:randomSplit的结果是一个RDD数组 136 scala> splitRDD.size 137 res8: Int = 4 138 //由于randomSplit的第一个参数weights中传入的值有4个,因此,就会切分成4个RDD, 139 //把原来的rdd按照权重0.5, 0.1, 0.2, 0.2,随机划分到这4个RDD中,权重高的RDD,划分到//的几率就大一些。 140 //注意,权重的总和加起来为1,否则会不正常 141 scala> splitRDD(0).collect 142 res10: Array[Int] = Array(1, 4) 143 144 scala> splitRDD(1).collect 145 res11: Array[Int] = Array(3) 146 147 scala> splitRDD(2).collect 148 res12: Array[Int] = Array(5, 9) 149 150 scala> splitRDD(3).collect 151 res13: Array[Int] = Array(2, 6, 7, 8, 10) 152 153 9) glom 154 155 def glom(): RDD[Array[T]] 156 该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。 157 158 scala> var rdd = sc.makeRDD(1 to 10,3) 159 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at makeRDD at :21 160 scala> rdd.partitions.size 161 res33: Int = 3 //该RDD有3个分区 162 scala> rdd.glom().collect 163 res35: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10)) 164 //glom将每个分区中的元素放到一个数组中,这样,结果就变成了3个数组 165 166 10) union并集 167 168 val rdd1 = sc.parallelize(List(5, 6, 4, 3)) 169 val rdd2 = sc.parallelize(List(1, 2, 3, 4)) 170 //求并集 171 val rdd3 = rdd1.union(rdd2) 172 rdd3.collect 173 11) distinct 174 去重 175 val rdd1 = sc.parallelize(List(5, 6, 4, 3)) 176 val rdd2 = sc.parallelize(List(1, 2, 3, 4)) 177 //求并集 178 val rdd3 = rdd1.union(rdd2) 179 //去重输出 180 rdd3.distinct.collect 181 182 12) intersection交集 183 val rdd1 = sc.parallelize(List(5, 6, 4, 3)) 184 val rdd2 = sc.parallelize(List(1, 2, 3, 4)) 185 //求交集 186 val rdd4 = rdd1.intersection(rdd2) 187 rdd4.collect 188 189 13) subtract 190 def subtract(other: RDD[T]): RDD[T] 191 def subtract(other: RDD[T], numPartitions: Int): RDD[T] 192 def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] 193 该函数返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。 194 195 val rdd1 = sc.parallelize(List(5, 6, 6, 4, 3)) 196 val rdd2 = sc.parallelize(List(1, 2, 3, 4)) 197 //求差集 198 val rdd4 = rdd1.subtract(rdd2) 199 rdd4.collect 200 14) subtractByKey 201 202 def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)] 203 def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)] 204 def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)] 205 206 subtractByKey和基本转换操作中的subtract类似,只不过这里是针对K的,返回在主RDD中出现,并且不在otherRDD中出现的元素。 207 参数numPartitions用于指定结果的分区数 208 参数partitioner用于指定分区函数 209 210 var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) 211 var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) 212 scala> rdd1.subtractByKey(rdd2).collect 213 res13: Array[(String, String)] = Array((B,2)) 214 215 15) groupbyKey 216 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2))) 217 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) 218 //求并集 219 val rdd4 = rdd1 union rdd2 220 //按key进行分组 221 val rdd5 = rdd4.groupByKey 222 rdd5.collect 223 16) reduceByKey 224 顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。 225 举例: 226 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2))) 227 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) 228 //求并集 229 val rdd4 = rdd1 union rdd2 230 //按key进行分组 231 val rdd6 = rdd4.reduceByKey(_ + _) 232 rdd6.collect() 233 234 17) sortByKey 235 将List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))和List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))做wordcount,并按名称排序 236 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))) 237 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))) 238 val rdd3 = rdd1.union(rdd2) 239 //按key进行聚合 240 val rdd4 = rdd3.reduceByKey(_ + _) 241 //false降序 242 val rdd5 = rdd4.sortByKey(false) 243 rdd5.collect 244 18) sortBy 245 将List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))和List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))做wordcount,并按数值排序 246 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))) 247 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))) 248 val rdd3 = rdd1.union(rdd2) 249 //按key进行聚合 250 val rdd4 = rdd3.reduceByKey(_ + _) 251 //false降序 252 val rdd5 = rdd4.sortBy(_._2, false) 253 rdd5.collect 254 255 19) zip 256 def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)] 257 258 zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。 259 260 scala> var rdd1 = sc.makeRDD(1 to 5,2) 261 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21 262 263 scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2) 264 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21 265 266 scala> rdd1.zip(rdd2).collect 267 res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E)) 268 269 scala> rdd2.zip(rdd1).collect 270 res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5)) 271 272 scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3) 273 rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21 274 scala> rdd1.zip(rdd3).collect 275 java.lang.IllegalArgumentException: Can‘t zip RDDs with unequal numbers of partitions 276 //如果两个RDD分区数不同,则抛出异常 277 278 20) zipPartitions 279 280 zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。 281 该函数有好几种实现,可分为三类: 282 283 参数是一个RDD 284 def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V] 285 286 def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V] 287 288 这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息 289 290 映射方法f参数为两个RDD的迭代器。 291 292 scala> var rdd1 = sc.makeRDD(1 to 5,2) 293 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21 294 295 scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2) 296 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21 297 298 //rdd1两个分区中元素分布: 299 scala> rdd1.mapPartitionsWithIndex{ 300 | (x,iter) => { 301 | var result = List[String]() 302 | while(iter.hasNext){ 303 | result ::= ("part_" + x + "|" + iter.next()) 304 | } 305 | result.iterator 306 | 307 | } 308 | }.collect 309 res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3) 310 311 //rdd2两个分区中元素分布 312 scala> rdd2.mapPartitionsWithIndex{ 313 | (x,iter) => { 314 | var result = List[String]() 315 | while(iter.hasNext){ 316 | result ::= ("part_" + x + "|" + iter.next()) 317 | } 318 | result.iterator 319 | 320 | } 321 | }.collect 322 res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C) 323 324 //rdd1和rdd2做zipPartition 325 scala> rdd1.zipPartitions(rdd2){ 326 | (rdd1Iter,rdd2Iter) => { 327 | var result = List[String]() 328 | while(rdd1Iter.hasNext && rdd2Iter.hasNext) { 329 | result::=(rdd1Iter.next() + "_" + rdd2Iter.next()) 330 | } 331 | result.iterator 332 | } 333 | }.collect 334 res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C) 335 336 337 参数是两个RDD 338 def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V] 339 340 def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V] 341 342 用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。 343 344 scala> var rdd1 = sc.makeRDD(1 to 5,2) 345 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21 346 347 scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2) 348 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21 349 350 scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2) 351 rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21 352 353 //rdd3中个分区元素分布 354 scala> rdd3.mapPartitionsWithIndex{ 355 | (x,iter) => { 356 | var result = List[String]() 357 | while(iter.hasNext){ 358 | result ::= ("part_" + x + "|" + iter.next()) 359 | } 360 | result.iterator 361 | 362 | } 363 | }.collect 364 res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c) 365 366 //三个RDD做zipPartitions 367 scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){ 368 | (rdd1Iter,rdd2Iter,rdd3Iter) => { 369 | var result = List[String]() 370 | while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) { 371 | result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next()) 372 | } 373 | result.iterator 374 | } 375 | } 376 rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27 377 378 scala> rdd4.collect 379 res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c) 380 381 参数是三个RDD 382 def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V] 383 384 def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V] 385 386 用法同上面,只不过这里又多了个一个RDD而已。 387 388 389 21) zipWithIndex 390 391 def zipWithIndex(): RDD[(T, Long)] 392 该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。 393 scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2) 394 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21 395 scala> rdd2.zipWithIndex().collect 396 res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4)) 397 398 22) zipWithUniqueId 399 400 def zipWithUniqueId(): RDD[(T, Long)] 401 该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下: 402 每个分区中第一个元素的唯一ID值为:该分区索引号, 403 每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数) 404 看下面的例子: 405 scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2) 406 rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21 407 //rdd1有两个分区, 408 scala> rdd1.zipWithUniqueId().collect 409 res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5)) 410 //总分区数为2 411 //第一个分区第一个元素ID为0,第二个分区第一个元素ID为1 412 //第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4 413 //第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5 414 415 键值转换 416 23) partitionBy 417 418 def partitionBy(partitioner: Partitioner): RDD[(K, V)] 419 该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。 420 scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2) 421 rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21 422 scala> rdd1.partitions.size 423 res20: Int = 2 424 425 //查看rdd1中每个分区的元素 426 scala> rdd1.mapPartitionsWithIndex{ 427 | (partIdx,iter) => { 428 | var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]() 429 | while(iter.hasNext){ 430 | var part_name = "part_" + partIdx; 431 | var elem = iter.next() 432 | if(part_map.contains(part_name)) { 433 | var elems = part_map(part_name) 434 | elems ::= elem 435 | part_map(part_name) = elems 436 | } else { 437 | part_map(part_name) = List[(Int,String)]{elem} 438 | } 439 | } 440 | part_map.iterator 441 | 442 | } 443 | }.collect 444 res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C)))) 445 //(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中 446 447 //使用partitionBy重分区 448 scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2)) 449 rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23 450 451 scala> rdd2.partitions.size 452 res23: Int = 2 453 454 //查看rdd2中每个分区的元素 455 scala> rdd2.mapPartitionsWithIndex{ 456 | (partIdx,iter) => { 457 | var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]() 458 | while(iter.hasNext){ 459 | var part_name = "part_" + partIdx; 460 | var elem = iter.next() 461 | if(part_map.contains(part_name)) { 462 | var elems = part_map(part_name) 463 | elems ::= elem 464 | part_map(part_name) = elems 465 | } else { 466 | part_map(part_name) = List[(Int,String)]{elem} 467 | } 468 | } 469 | part_map.iterator 470 | } 471 | }.collect 472 res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A)))) 473 //(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中 474 24) mapValues 475 mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。 476 举例: 477 scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2) 478 scala> val b = a.map(x => (x.length, x)) 479 scala> b.mapValues("x" + _ + "x").collect 480 res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex)) 481 482 25) flatMapValues 483 flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。 484 举例 485 val a = sc.parallelize(List((1, 2), (3, 4), (5, 6))) 486 val b = a.flatMapValues(x => 1.to(x)) 487 b.collect.foreach(println) 488 26) combineByKey 489 490 def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] 491 def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] 492 def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] 493 该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。 494 其中的参数: 495 createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C ,分区内相同的key做一次 496 mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C,分区内相同的key循环做 497 mergeCombiners:分区合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C,分区之间循环做 498 numPartitions:结果RDD分区数,默认保持原有的分区数 499 partitioner:分区函数,默认为HashPartitioner 500 mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true 501 502 看下面例子: 503 scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1))) 504 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21 505 scala> rdd1.combineByKey( 506 | (v : Int) => v + "_", 507 | (c : String, v : Int) => c + "@" + v, 508 | (c1 : String, c2 : String) => c1 + "$" + c2 509 | ).collect 510 res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_)) 511 512 其中三个映射函数分别为: 513 createCombiner: (V) => C 514 (v : Int) => v + “_” //在每一个V值后面加上字符_,返回C类型(String) 515 mergeValue: (C, V) => C 516 (c : String, v : Int) => c + “@” + v //合并C类型和V类型,中间加字符@,返回C(String) 517 mergeCombiners: (C, C) => C 518 (c1 : String, c2 : String) => c1 + “$” + c2 //合并C类型和C类型,中间加$,返回C(String) 519 其他参数为默认值。 520 最终,将RDD[String,Int]转换为RDD[String,String]。 521 522 再看例子: 523 524 rdd1.combineByKey( 525 (v : Int) => List(v), 526 (c : List[Int], v : Int) => v :: c, 527 (c1 : List[Int], c2 : List[Int]) => c1 ::: c2 528 ).collect 529 res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1))) 530 最终将RDD[String,Int]转换为RDD[String,List[Int]]。 531 532 27) foldByKey 533 534 def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] 535 536 def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] 537 538 def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] 539 540 541 542 该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V. 543 544 例子: 545 546 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) 547 scala> rdd1.foldByKey(0)(_+_).collect 548 res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1)) 549 //将rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操 550 //作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即: 551 //("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2) 552 553 再看: 554 555 scala> rdd1.foldByKey(2)(_+_).collect 556 res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3)) 557 //先将zeroValue=2应用于每个V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函 558 //数应用于初始化后的V,最后得到:(A,2+4),即:(A,6) 559 560 再看乘法操作: 561 562 scala> rdd1.foldByKey(0)(_*_).collect 563 res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0)) 564 //先将zeroValue=0应用于每个V,注意,这次映射函数为乘法,得到:("A",0*0), ("A",2*0), 565 //即:("A",0), ("A",0),再将映射函//数应用于初始化后的V,最后得到:(A,0*0),即:(A,0) 566 //其他K也一样,最终都得到了V=0 567 568 scala> rdd1.foldByKey(1)(_*_).collect 569 res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1)) 570 //映射函数为乘法时,需要将zeroValue设为1,才能得到我们想要的结果。 571 572 在使用foldByKey算子时候,要特别注意映射函数及zeroValue的取值。 573 28) reduceByKeyLocally 574 575 def reduceByKeyLocally(func: (V, V) => V): Map[K, V] 576 577 该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。 578 579 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) 580 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[91] at makeRDD at :21 581 582 scala> rdd1.reduceByKeyLocally((x,y) => x + y) 583 res90: scala.collection.Map[String,Int] = Map(B -> 3, A -> 2, C -> 1) 584 585 29) cogroup和groupByKey的区别 586 587 val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2))) 588 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) 589 //cogroup 590 val rdd3 = rdd1.cogroup(rdd2) 591 //groupbykey 592 val rdd4 = rdd1.union(rdd2).groupByKey 593 //注意cogroup与groupByKey的区别 594 rdd3.foreach(println) 595 rdd4.foreach(println) 596 30) join 597 598 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2))) 599 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) 600 //求jion 601 val rdd3 = rdd1.join(rdd2) 602 rdd3.collect 603 604 31) leftOuterJoin 605 606 def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] 607 def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] 608 def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] 609 610 leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。 611 参数numPartitions用于指定结果的分区数 612 参数partitioner用于指定分区函数 613 614 var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) 615 var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) 616 617 scala> rdd1.leftOuterJoin(rdd2).collect 618 res11: Array[(String, (String, Option[String]))] = Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some(c)))) 619 620 32) rightOuterJoin 621 622 def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] 623 def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] 624 def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))] 625 626 rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。 627 参数numPartitions用于指定结果的分区数 628 参数partitioner用于指定分区函数 629 630 var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) 631 var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) 632 scala> rdd1.rightOuterJoin(rdd2).collect 633 res12: Array[(String, (Option[String], String))] = Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c))) 634 635 636 Action操作 637 33) first 638 639 def first(): T 640 641 first返回RDD中的第一个元素,不排序。 642 643 scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) 644 rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21 645 646 scala> rdd1.first 647 res14: (String, String) = (A,1) 648 649 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) 650 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21 651 652 scala> rdd1.first 653 res8: Int = 10 654 655 34) count 656 657 def count(): Long 658 659 count返回RDD中的元素数量。 660 661 scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) 662 rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21 663 664 scala> rdd1.count 665 res15: Long = 3 666 667 35) reduce 668 669 def reduce(f: (T, T) ⇒ T): T 670 671 根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。 672 673 scala> var rdd1 = sc.makeRDD(1 to 10,2) 674 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21 675 676 scala> rdd1.reduce(_ + _) 677 res18: Int = 55 678 679 scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) 680 rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21 681 682 scala> rdd2.reduce((x,y) => { 683 | (x._1 + y._1,x._2 + y._2) 684 | }) 685 res21: (String, Int) = (CBBAA,6) 686 687 collect 688 689 def collect(): Array[T] 690 691 collect用于将一个RDD转换成数组。 692 693 scala> var rdd1 = sc.makeRDD(1 to 10,2) 694 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21 695 696 scala> rdd1.collect 697 res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 698 36) take 699 700 def take(num: Int): Array[T] 701 702 take用于获取RDD中从0到num-1下标的元素,不排序。 703 704 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) 705 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21 706 707 scala> rdd1.take(1) 708 res0: Array[Int] = Array(10) 709 710 scala> rdd1.take(2) 711 res1: Array[Int] = Array(10, 4) 712 713 37) top 714 715 def top(num: Int)(implicit ord: Ordering[T]): Array[T] 716 717 top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。 718 719 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) 720 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21 721 722 scala> rdd1.top(1) 723 res2: Array[Int] = Array(12) 724 725 scala> rdd1.top(2) 726 res3: Array[Int] = Array(12, 10) 727 728 //指定排序规则 729 scala> implicit val myOrd = implicitly[Ordering[Int]].reverse 730 myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499ef 731 732 scala> rdd1.top(1) 733 res4: Array[Int] = Array(2) 734 735 scala> rdd1.top(2) 736 res5: Array[Int] = Array(2, 3) 737 738 38) takeOrdered 739 740 def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] 741 742 takeOrdered和top类似,只不过以和top相反的顺序返回元素。 743 744 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) 745 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21 746 747 scala> rdd1.top(1) 748 res4: Array[Int] = Array(12) 749 750 scala> rdd1.top(2) 751 res5: Array[Int] = Array(12, 10) 752 753 scala> rdd1.takeOrdered(1) 754 res6: Array[Int] = Array(2) 755 756 scala> rdd1.takeOrdered(2) 757 res7: Array[Int] = Array(2, 3) 758 759 39) aggregate 760 761 def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U 762 763 aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。 764 765 var rdd1 = sc.makeRDD(1 to 10,2) 766 rdd1.mapPartitionsWithIndex{ 767 (partIdx,iter) => { 768 var part_map = scala.collection.mutable.Map[String,List[Int]]() 769 while(iter.hasNext){ 770 var part_name = "part_" + partIdx; 771 var elem = iter.next() 772 if(part_map.contains(part_name)) { 773 var elems = part_map(part_name) 774 elems ::= elem 775 part_map(part_name) = elems 776 } else { 777 part_map(part_name) = List[Int]{elem} 778 } 779 } 780 part_map.iterator 781 782 } 783 }.collect 784 res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6))) 785 786 ##第一个分区中包含5,4,3,2,1 787 788 ##第二个分区中包含10,9,8,7,6 789 790 scala> rdd1.aggregate(1)( 791 | {(x : Int,y : Int) => x + y}, 792 | {(a : Int,b : Int) => a + b} 793 | ) 794 res17: Int = 58 795 796 结果为什么是58,看下面的计算过程: 797 798 ##先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1 799 800 ##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16 801 802 ## part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41 803 804 ##再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1 805 806 ##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58 807 808 再比如: 809 810 scala> rdd1.aggregate(2)( 811 | {(x : Int,y : Int) => x + y}, 812 | {(a : Int,b : Int) => a * b} 813 | ) 814 res18: Int = 1428 815 816 ##这次zeroValue=2 817 818 ##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17 819 820 ##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42 821 822 ##最后:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428 823 824 因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。 825 826 827 828 40) fold 829 830 def fold(zeroValue: T)(op: (T, T) ⇒ T): T 831 832 fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。 833 var rdd1 = sc.makeRDD(1 to 10, 2) 834 scala> rdd1.fold(1)( 835 | (x,y) => x + y 836 | ) 837 res19: Int = 58 838 839 ##结果同上面使用aggregate的第一个例子一样,即: 840 scala> rdd1.aggregate(1)( 841 | {(x,y) => x + y}, 842 | {(a,b) => a + b} 843 | ) 844 res20: Int = 58 845 846 41) lookup 847 848 def lookup(key: K): Seq[V] 849 850 lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。 851 852 853 854 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) 855 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21 856 857 scala> rdd1.lookup("A") 858 res0: Seq[Int] = WrappedArray(0, 2) 859 860 scala> rdd1.lookup("B") 861 res1: Seq[Int] = WrappedArray(1, 2) 862 42) countByKey 863 864 def countByKey(): Map[K, Long] 865 866 countByKey用于统计RDD[K,V]中每个K的数量。 867 868 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3))) 869 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21 870 871 scala> rdd1.countByKey 872 res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3) 873 874 43) foreach 875 876 def foreach(f: (T) ⇒ Unit): Unit 877 878 foreach用于遍历RDD,将函数f应用于每一个元素。 879 但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。 880 比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。 881 我在Spark1.4中是这样,不知道是否真如此。 882 这时候,使用accumulator共享变量与foreach结合,倒是个不错的选择。 883 884 scala> var cnt = sc.accumulator(0) 885 cnt: org.apache.spark.Accumulator[Int] = 0 886 887 scala> var rdd1 = sc.makeRDD(1 to 10,2) 888 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21 889 890 scala> rdd1.foreach(x => cnt += x) 891 892 scala> cnt.value 893 res51: Int = 55 894 895 scala> rdd1.collect.foreach(println) 896 44) foreachPartition 897 898 def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit 899 900 foreachPartition和foreach类似,只不过是对每一个分区使用f。 901 902 scala> var rdd1 = sc.makeRDD(1 to 10,2) 903 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21 904 905 scala> var allsize = sc.accumulator(0) 906 size: org.apache.spark.Accumulator[Int] = 0 907 908 909 scala> rdd1.foreachPartition { x => { 910 | allsize += x.size 911 | }} 912 913 scala> println(allsize.value) 914 10 915 916 45) sortBy 917 918 def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 919 920 sortBy根据给定的排序k函数将RDD中的元素进行排序。 921 922 scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2) 923 924 scala> rdd1.sortBy(x => x).collect 925 res1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //默认升序 926 927 scala> rdd1.sortBy(x => x,false).collect 928 res2: Array[Int] = Array(7, 6, 3, 2, 1, 0) //降序 929 930 //RDD[K,V]类型 931 scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) 932 933 scala> rdd1.sortBy(x => x).collect 934 res3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7)) 935 936 //按照V进行降序排序 937 scala> rdd1.sortBy(x => x._2,false).collect 938 res4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1)) 939 46) saveAsTextFile 940 941 def saveAsTextFile(path: String): Unit 942 943 def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit 944 945 saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。 946 947 codec参数可以指定压缩的类名。 948 949 var rdd1 = sc.makeRDD(1 to 10,2) 950 scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFS 951 hadoop fs -ls /tmp/lxw1234.com 952 Found 2 items 953 -rw-r--r-- 2 lxw1234 supergroup 0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS 954 -rw-r--r-- 2 lxw1234 supergroup 21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000 955 956 hadoop fs -cat /tmp/lxw1234.com/part-00000 957 958 注意:如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)将文件保存到本地文件系统,那么只会保存在Executor所在机器的本地目录。 959 960 //指定压缩格式保存 961 962 rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/",classOf[com.hadoop.compression.lzo.LzopCodec]) 963 964 hadoop fs -ls /tmp/lxw1234.com 965 -rw-r--r-- 2 lxw1234 supergroup 0 2015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS 966 -rw-r--r-- 2 lxw1234 supergroup 71 2015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzo 967 968 hadoop fs -text /tmp/lxw1234.com/part-00000.lzo 969 970 971 47) saveAsSequenceFile 972 973 saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。 974 用法同saveAsTextFile。 975 976 48) saveAsObjectFile 977 978 def saveAsObjectFile(path: String): Unit 979 980 saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。 981 982 对于HDFS,默认采用SequenceFile保存。 983 984 var rdd1 = sc.makeRDD(1 to 10,2) 985 rdd1.saveAsObjectFile("hdfs://cdh5/tmp/lxw1234.com/") 986 987 hadoop fs -cat /tmp/lxw1234.com/part-00000 988 SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT 989 990 49) saveAsHadoopFile 991 992 def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit 993 def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = …, codec: Option[Class[_ <: CompressionCodec]] = None): Unit 994 995 saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API。 996 997 可以指定outputKeyClass、outputValueClass以及压缩格式。 998 每个分区输出一个文件。 999 var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) 1000 import org.apache.hadoop.mapred.TextOutputFormat 1001 import org.apache.hadoop.io.Text 1002 import org.apache.hadoop.io.IntWritable 1003 1004 rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]]) 1005 1006 rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]], 1007 classOf[com.hadoop.compression.lzo.LzopCodec]) 1008 1009 50) saveAsHadoopDataset 1010 1011 def saveAsHadoopDataset(conf: JobConf): Unit 1012 saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中,比如HBase。 1013 在JobConf中,通常需要关注或者设置五个参数: 1014 文件的保存路径、key值的class类型、value值的class类型、RDD的输出格式(OutputFormat)、以及压缩相关的参数。 1015 1016 ##使用saveAsHadoopDataset将RDD保存到HDFS中 1017 1018 import org.apache.spark.SparkConf 1019 import org.apache.spark.SparkContext 1020 import SparkContext._ 1021 import org.apache.hadoop.mapred.TextOutputFormat 1022 import org.apache.hadoop.io.Text 1023 import org.apache.hadoop.io.IntWritable 1024 import org.apache.hadoop.mapred.JobConf 1025 1026 1027 1028 var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) 1029 var jobConf = new JobConf() 1030 jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]]) 1031 jobConf.setOutputKeyClass(classOf[Text]) 1032 jobConf.setOutputValueClass(classOf[IntWritable]) 1033 jobConf.set("mapred.output.dir","/tmp/lxw1234/") 1034 rdd1.saveAsHadoopDataset(jobConf) 1035 1036 结果: 1037 hadoop fs -cat /tmp/lxw1234/part-00000 1038 A 2 1039 A 1 1040 hadoop fs -cat /tmp/lxw1234/part-00001 1041 B 6 1042 B 3 1043 B 7 1044 1045 ##保存数据到HBASE 1046 1047 HBase建表: 1048 1049 create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1} 1050 1051 import org.apache.spark.SparkConf 1052 import org.apache.spark.SparkContext 1053 import SparkContext._ 1054 import org.apache.hadoop.mapred.TextOutputFormat 1055 import org.apache.hadoop.io.Text 1056 import org.apache.hadoop.io.IntWritable 1057 import org.apache.hadoop.mapred.JobConf 1058 import org.apache.hadoop.hbase.HBaseConfiguration 1059 import org.apache.hadoop.hbase.mapred.TableOutputFormat 1060 import org.apache.hadoop.hbase.client.Put 1061 import org.apache.hadoop.hbase.util.Bytes 1062 import org.apache.hadoop.hbase.io.ImmutableBytesWritable 1063 1064 var conf = HBaseConfiguration.create() 1065 var jobConf = new JobConf(conf) 1066 jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3") 1067 jobConf.set("zookeeper.znode.parent","/hbase") 1068 jobConf.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234") 1069 jobConf.setOutputFormat(classOf[TableOutputFormat]) 1070 1071 var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7))) 1072 rdd1.map(x => 1073 { 1074 var put = new Put(Bytes.toBytes(x._1)) 1075 put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2)) 1076 (new ImmutableBytesWritable,put) 1077 } 1078 ).saveAsHadoopDataset(jobConf) 1079 1080 ##结果: 1081 hbase(main):005:0> scan ‘lxw1234‘ 1082 ROW COLUMN+CELL 1083 A column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02 1084 B column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06 1085 C column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07 1086 3 row(s) in 0.0550 seconds 1087 1088 注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。 1089 1090 51) saveAsNewAPIHadoopFile 1091 1092 def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit 1093 1094 def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit 1095 1096 1097 1098 saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。 1099 1100 用法基本同saveAsHadoopFile。 1101 1102 import org.apache.spark.SparkConf 1103 import org.apache.spark.SparkContext 1104 import SparkContext._ 1105 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat 1106 import org.apache.hadoop.io.Text 1107 import org.apache.hadoop.io.IntWritable 1108 1109 var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) 1110 rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]]) 1111 1112 52) saveAsNewAPIHadoopDataset 1113 1114 def saveAsNewAPIHadoopDataset(conf: Configuration): Unit 1115 1116 作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。 1117 1118 以写入HBase为例: 1119 1120 1121 1122 HBase建表: 1123 1124 create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1} 1125 1126 1127 1128 完整的Spark应用程序: 1129 1130 package com.lxw1234.test 1131 1132 import org.apache.spark.SparkConf 1133 import org.apache.spark.SparkContext 1134 import SparkContext._ 1135 import org.apache.hadoop.hbase.HBaseConfiguration 1136 import org.apache.hadoop.mapreduce.Job 1137 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat 1138 import org.apache.hadoop.hbase.io.ImmutableBytesWritable 1139 import org.apache.hadoop.hbase.client.Result 1140 import org.apache.hadoop.hbase.util.Bytes 1141 import org.apache.hadoop.hbase.client.Put 1142 1143 object Test { 1144 def main(args : Array[String]) { 1145 val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com") 1146 val sc = new SparkContext(sparkConf); 1147 var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7))) 1148 1149 sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3") 1150 sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase") 1151 sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234") 1152 var job = new Job(sc.hadoopConfiguration) 1153 job.setOutputKeyClass(classOf[ImmutableBytesWritable]) 1154 job.setOutputValueClass(classOf[Result]) 1155 job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) 1156 1157 rdd1.map( 1158 x => { 1159 var put = new Put(Bytes.toBytes(x._1)) 1160 put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2)) 1161 (new ImmutableBytesWritable,put) 1162 } 1163 ).saveAsNewAPIHadoopDataset(job.getConfiguration) 1164 1165 sc.stop() 1166 } 1167 } 1168 1169 注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。
1 1、RDD操作详解 2 启动spark-shell 3 spark-shell --master spark://hdp-node-01:7077 4 Spark core 核心数据抽象是RDD 5 6 1.1 基本转换 7 1) map 8 map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 9 举例: 10 //设置spark的配置文件信息 11 val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local") 12 //构建sparkcontext上下文对象,它是程序的入口,所有计算的源头 13 val sc: SparkContext = new SparkContext(sparkConf) 14 //定义一个列表 15 val list: List[Int] = List(1, 2, 3, 4, 5, 6, 7) 16 //生成一个rdd 17 val rdd: RDD[Int] = sc.parallelize(list) 18 //map算子 19 val map: RDD[Int] = rdd.map(x => x * 2) 20 //foreach 算子打印 21 map.foreach(x => println(x)) 22 sc.stop() 23 上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。 24 2) filter 25 filter 是对RDD中的每个元素都执行一个指定的函数来过滤产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 26 //设置spark的配置文件信息 27 val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local") 28 //构建sparkcontext上下文对象,它是程序的入口,所有计算的源头 29 val sc: SparkContext = new SparkContext(sparkConf) 30 sc.setLogLevel("WARN") 31 //定义一个列表 32 val list: List[Int] = List(1, 2, 3, 4, 5, 6, 7) 33 //生成一个rdd 34 val rdd: RDD[Int] = sc.parallelize(list) 35 //filter算子 36 val map: RDD[Int] = rdd.filter(_ > 5) 37 //foreach 算子打印 38 map.foreach(x => println(x)) 39 sc.stop() 40 41 3) flatMap 42 与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值) 43 scala> val a = sc.parallelize(1 to 4, 2) 44 scala> val b = a.flatMap(x => 1 to x) 45 scala> b.collect 46 res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4) 47 48 4) mapPartitions 49 mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 它的函数定义为: 50 def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] 51 f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。 52 举例: 53 scala> val a = sc.parallelize(1 to 9, 3) 54 scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { 55 var res = List[(T, T)]() 56 var pre = iter.next 57 while (iter.hasNext) { 58 val cur = iter.next 59 res.::=(pre, cur) 60 pre = cur } 61 res.iterator 62 } 63 scala> a.mapPartitions(myfunc).collect 64 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 65 上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。 mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。 66 5) mapPartitionsWithIndex 67 def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U] 68 函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。 69 70 var rdd1 = sc.makeRDD(1 to 5,2) 71 //rdd1有两个分区 72 var rdd2 = rdd1.mapPartitionsWithIndex{ 73 (x,iter) => { 74 var result = List[String]() 75 var i = 0 76 while(iter.hasNext){ 77 i += iter.next() 78 } 79 result.::(x + "|" + i).iterator 80 81 } 82 } 83 //rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引 84 scala> rdd2.collect 85 res13: Array[String] = Array(0|3, 1|12) 86 87 88 6) coalesce 89 90 def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T] 91 该函数用于将RDD进行重分区,使用HashPartitioner。 92 第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false; 93 以下面的例子来看: 94 scala> var data = sc.parallelize(1 to 12, 3) 95 scala> data.collect 96 scala> data.partitions.size 97 scala> var rdd1 = data.coalesce(1) 98 scala> rdd1.partitions.size 99 scala> var rdd1 = data.coalesce(4) 100 scala> rdd1.partitions.size 101 res2: Int = 1 //如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,//否则,分区数不便 102 scala> var rdd1 = data.coalesce(4,true) 103 scala> rdd1.partitions.size 104 res3: Int = 4 105 106 7) repartition 107 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 108 该函数其实就是coalesce函数第二个参数为true的实现 109 scala> var data = sc.parallelize(1 to 12, 3) 110 scala> data.collect 111 scala> data.partitions.size 112 scala> var rdd1 = data. repartition(1) 113 scala> rdd1.partitions.size 114 scala> var rdd1 = data. repartition(4) 115 scala> rdd1.partitions.size 116 res3: Int = 4 117 8) randomSplit 118 119 def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] 120 该函数根据weights权重,将一个RDD切分成多个RDD。 121 该权重参数为一个Double数组 122 第二个参数为random的种子,基本可忽略。 123 scala> var rdd = sc.makeRDD(1 to 12,12) 124 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at :21 125 126 scala> rdd.collect 127 res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 128 129 scala> var splitRDD = rdd.randomSplit(Array(0.5, 0.1, 0.2, 0.2)) 130 splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[17] at randomSplit at :23, 131 MapPartitionsRDD[18] at randomSplit at :23, 132 MapPartitionsRDD[19] at randomSplit at :23, 133 MapPartitionsRDD[20] at randomSplit at :23) 134 135 //这里注意:randomSplit的结果是一个RDD数组 136 scala> splitRDD.size 137 res8: Int = 4 138 //由于randomSplit的第一个参数weights中传入的值有4个,因此,就会切分成4个RDD, 139 //把原来的rdd按照权重0.5, 0.1, 0.2, 0.2,随机划分到这4个RDD中,权重高的RDD,划分到//的几率就大一些。 140 //注意,权重的总和加起来为1,否则会不正常 141 scala> splitRDD(0).collect 142 res10: Array[Int] = Array(1, 4) 143 144 scala> splitRDD(1).collect 145 res11: Array[Int] = Array(3) 146 147 scala> splitRDD(2).collect 148 res12: Array[Int] = Array(5, 9) 149 150 scala> splitRDD(3).collect 151 res13: Array[Int] = Array(2, 6, 7, 8, 10) 152 153 9) glom 154 155 def glom(): RDD[Array[T]] 156 该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。 157 158 scala> var rdd = sc.makeRDD(1 to 10,3) 159 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at makeRDD at :21 160 scala> rdd.partitions.size 161 res33: Int = 3 //该RDD有3个分区 162 scala> rdd.glom().collect 163 res35: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10)) 164 //glom将每个分区中的元素放到一个数组中,这样,结果就变成了3个数组 165 166 10) union并集 167 168 val rdd1 = sc.parallelize(List(5, 6, 4, 3)) 169 val rdd2 = sc.parallelize(List(1, 2, 3, 4)) 170 //求并集 171 val rdd3 = rdd1.union(rdd2) 172 rdd3.collect 173 11) distinct 174 去重 175 val rdd1 = sc.parallelize(List(5, 6, 4, 3)) 176 val rdd2 = sc.parallelize(List(1, 2, 3, 4)) 177 //求并集 178 val rdd3 = rdd1.union(rdd2) 179 //去重输出 180 rdd3.distinct.collect 181 182 12) intersection交集 183 val rdd1 = sc.parallelize(List(5, 6, 4, 3)) 184 val rdd2 = sc.parallelize(List(1, 2, 3, 4)) 185 //求交集 186 val rdd4 = rdd1.intersection(rdd2) 187 rdd4.collect 188 189 13) subtract 190 def subtract(other: RDD[T]): RDD[T] 191 def subtract(other: RDD[T], numPartitions: Int): RDD[T] 192 def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] 193 该函数返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。 194 195 val rdd1 = sc.parallelize(List(5, 6, 6, 4, 3)) 196 val rdd2 = sc.parallelize(List(1, 2, 3, 4)) 197 //求差集 198 val rdd4 = rdd1.subtract(rdd2) 199 rdd4.collect 200 14) subtractByKey 201 202 def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)] 203 def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)] 204 def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)] 205 206 subtractByKey和基本转换操作中的subtract类似,只不过这里是针对K的,返回在主RDD中出现,并且不在otherRDD中出现的元素。 207 参数numPartitions用于指定结果的分区数 208 参数partitioner用于指定分区函数 209 210 var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) 211 var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) 212 scala> rdd1.subtractByKey(rdd2).collect 213 res13: Array[(String, String)] = Array((B,2)) 214 215 15) groupbyKey 216 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2))) 217 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) 218 //求并集 219 val rdd4 = rdd1 union rdd2 220 //按key进行分组 221 val rdd5 = rdd4.groupByKey 222 rdd5.collect 223 16) reduceByKey 224 顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。 225 举例: 226 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2))) 227 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) 228 //求并集 229 val rdd4 = rdd1 union rdd2 230 //按key进行分组 231 val rdd6 = rdd4.reduceByKey(_ + _) 232 rdd6.collect() 233 234 17) sortByKey 235 将List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))和List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))做wordcount,并按名称排序 236 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))) 237 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))) 238 val rdd3 = rdd1.union(rdd2) 239 //按key进行聚合 240 val rdd4 = rdd3.reduceByKey(_ + _) 241 //false降序 242 val rdd5 = rdd4.sortByKey(false) 243 rdd5.collect 244 18) sortBy 245 将List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))和List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))做wordcount,并按数值排序 246 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1))) 247 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5))) 248 val rdd3 = rdd1.union(rdd2) 249 //按key进行聚合 250 val rdd4 = rdd3.reduceByKey(_ + _) 251 //false降序 252 val rdd5 = rdd4.sortBy(_._2, false) 253 rdd5.collect 254 255 19) zip 256 def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)] 257 258 zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。 259 260 scala> var rdd1 = sc.makeRDD(1 to 5,2) 261 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21 262 263 scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2) 264 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21 265 266 scala> rdd1.zip(rdd2).collect 267 res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E)) 268 269 scala> rdd2.zip(rdd1).collect 270 res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5)) 271 272 scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3) 273 rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21 274 scala> rdd1.zip(rdd3).collect 275 java.lang.IllegalArgumentException: Can‘t zip RDDs with unequal numbers of partitions 276 //如果两个RDD分区数不同,则抛出异常 277 278 20) zipPartitions 279 280 zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。 281 该函数有好几种实现,可分为三类: 282 283 参数是一个RDD 284 def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V] 285 286 def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V] 287 288 这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息 289 290 映射方法f参数为两个RDD的迭代器。 291 292 scala> var rdd1 = sc.makeRDD(1 to 5,2) 293 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21 294 295 scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2) 296 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21 297 298 //rdd1两个分区中元素分布: 299 scala> rdd1.mapPartitionsWithIndex{ 300 | (x,iter) => { 301 | var result = List[String]() 302 | while(iter.hasNext){ 303 | result ::= ("part_" + x + "|" + iter.next()) 304 | } 305 | result.iterator 306 | 307 | } 308 | }.collect 309 res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3) 310 311 //rdd2两个分区中元素分布 312 scala> rdd2.mapPartitionsWithIndex{ 313 | (x,iter) => { 314 | var result = List[String]() 315 | while(iter.hasNext){ 316 | result ::= ("part_" + x + "|" + iter.next()) 317 | } 318 | result.iterator 319 | 320 | } 321 | }.collect 322 res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C) 323 324 //rdd1和rdd2做zipPartition 325 scala> rdd1.zipPartitions(rdd2){ 326 | (rdd1Iter,rdd2Iter) => { 327 | var result = List[String]() 328 | while(rdd1Iter.hasNext && rdd2Iter.hasNext) { 329 | result::=(rdd1Iter.next() + "_" + rdd2Iter.next()) 330 | } 331 | result.iterator 332 | } 333 | }.collect 334 res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C) 335 336 337 参数是两个RDD 338 def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V] 339 340 def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V] 341 342 用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。 343 344 scala> var rdd1 = sc.makeRDD(1 to 5,2) 345 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21 346 347 scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2) 348 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21 349 350 scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2) 351 rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21 352 353 //rdd3中个分区元素分布 354 scala> rdd3.mapPartitionsWithIndex{ 355 | (x,iter) => { 356 | var result = List[String]() 357 | while(iter.hasNext){ 358 | result ::= ("part_" + x + "|" + iter.next()) 359 | } 360 | result.iterator 361 | 362 | } 363 | }.collect 364 res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c) 365 366 //三个RDD做zipPartitions 367 scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){ 368 | (rdd1Iter,rdd2Iter,rdd3Iter) => { 369 | var result = List[String]() 370 | while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) { 371 | result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next()) 372 | } 373 | result.iterator 374 | } 375 | } 376 rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27 377 378 scala> rdd4.collect 379 res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c) 380 381 参数是三个RDD 382 def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V] 383 384 def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V] 385 386 用法同上面,只不过这里又多了个一个RDD而已。 387 388 389 21) zipWithIndex 390 391 def zipWithIndex(): RDD[(T, Long)] 392 该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。 393 scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2) 394 rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21 395 scala> rdd2.zipWithIndex().collect 396 res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4)) 397 398 22) zipWithUniqueId 399 400 def zipWithUniqueId(): RDD[(T, Long)] 401 该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下: 402 每个分区中第一个元素的唯一ID值为:该分区索引号, 403 每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数) 404 看下面的例子: 405 scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2) 406 rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21 407 //rdd1有两个分区, 408 scala> rdd1.zipWithUniqueId().collect 409 res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5)) 410 //总分区数为2 411 //第一个分区第一个元素ID为0,第二个分区第一个元素ID为1 412 //第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4 413 //第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5 414 415 键值转换 416 23) partitionBy 417 418 def partitionBy(partitioner: Partitioner): RDD[(K, V)] 419 该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。 420 scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2) 421 rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21 422 scala> rdd1.partitions.size 423 res20: Int = 2 424 425 //查看rdd1中每个分区的元素 426 scala> rdd1.mapPartitionsWithIndex{ 427 | (partIdx,iter) => { 428 | var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]() 429 | while(iter.hasNext){ 430 | var part_name = "part_" + partIdx; 431 | var elem = iter.next() 432 | if(part_map.contains(part_name)) { 433 | var elems = part_map(part_name) 434 | elems ::= elem 435 | part_map(part_name) = elems 436 | } else { 437 | part_map(part_name) = List[(Int,String)]{elem} 438 | } 439 | } 440 | part_map.iterator 441 | 442 | } 443 | }.collect 444 res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C)))) 445 //(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中 446 447 //使用partitionBy重分区 448 scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2)) 449 rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23 450 451 scala> rdd2.partitions.size 452 res23: Int = 2 453 454 //查看rdd2中每个分区的元素 455 scala> rdd2.mapPartitionsWithIndex{ 456 | (partIdx,iter) => { 457 | var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]() 458 | while(iter.hasNext){ 459 | var part_name = "part_" + partIdx; 460 | var elem = iter.next() 461 | if(part_map.contains(part_name)) { 462 | var elems = part_map(part_name) 463 | elems ::= elem 464 | part_map(part_name) = elems 465 | } else { 466 | part_map(part_name) = List[(Int,String)]{elem} 467 | } 468 | } 469 | part_map.iterator 470 | } 471 | }.collect 472 res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A)))) 473 //(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中 474 24) mapValues 475 mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。 476 举例: 477 scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2) 478 scala> val b = a.map(x => (x.length, x)) 479 scala> b.mapValues("x" + _ + "x").collect 480 res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex)) 481 482 25) flatMapValues 483 flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。 484 举例 485 val a = sc.parallelize(List((1, 2), (3, 4), (5, 6))) 486 val b = a.flatMapValues(x => 1.to(x)) 487 b.collect.foreach(println) 488 26) combineByKey 489 490 def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] 491 def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] 492 def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] 493 该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。 494 其中的参数: 495 createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C ,分区内相同的key做一次 496 mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C,分区内相同的key循环做 497 mergeCombiners:分区合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C,分区之间循环做 498 numPartitions:结果RDD分区数,默认保持原有的分区数 499 partitioner:分区函数,默认为HashPartitioner 500 mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true 501 502 看下面例子: 503 scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1))) 504 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21 505 scala> rdd1.combineByKey( 506 | (v : Int) => v + "_", 507 | (c : String, v : Int) => c + "@" + v, 508 | (c1 : String, c2 : String) => c1 + "$" + c2 509 | ).collect 510 res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_)) 511 512 其中三个映射函数分别为: 513 createCombiner: (V) => C 514 (v : Int) => v + “_” //在每一个V值后面加上字符_,返回C类型(String) 515 mergeValue: (C, V) => C 516 (c : String, v : Int) => c + “@” + v //合并C类型和V类型,中间加字符@,返回C(String) 517 mergeCombiners: (C, C) => C 518 (c1 : String, c2 : String) => c1 + “$” + c2 //合并C类型和C类型,中间加$,返回C(String) 519 其他参数为默认值。 520 最终,将RDD[String,Int]转换为RDD[String,String]。 521 522 再看例子: 523 524 rdd1.combineByKey( 525 (v : Int) => List(v), 526 (c : List[Int], v : Int) => v :: c, 527 (c1 : List[Int], c2 : List[Int]) => c1 ::: c2 528 ).collect 529 res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1))) 530 最终将RDD[String,Int]转换为RDD[String,List[Int]]。 531 532 27) foldByKey 533 534 def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] 535 536 def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] 537 538 def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] 539 540 541 542 该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V. 543 544 例子: 545 546 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) 547 scala> rdd1.foldByKey(0)(_+_).collect 548 res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1)) 549 //将rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操 550 //作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即: 551 //("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2) 552 553 再看: 554 555 scala> rdd1.foldByKey(2)(_+_).collect 556 res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3)) 557 //先将zeroValue=2应用于每个V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函 558 //数应用于初始化后的V,最后得到:(A,2+4),即:(A,6) 559 560 再看乘法操作: 561 562 scala> rdd1.foldByKey(0)(_*_).collect 563 res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0)) 564 //先将zeroValue=0应用于每个V,注意,这次映射函数为乘法,得到:("A",0*0), ("A",2*0), 565 //即:("A",0), ("A",0),再将映射函//数应用于初始化后的V,最后得到:(A,0*0),即:(A,0) 566 //其他K也一样,最终都得到了V=0 567 568 scala> rdd1.foldByKey(1)(_*_).collect 569 res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1)) 570 //映射函数为乘法时,需要将zeroValue设为1,才能得到我们想要的结果。 571 572 在使用foldByKey算子时候,要特别注意映射函数及zeroValue的取值。 573 28) reduceByKeyLocally 574 575 def reduceByKeyLocally(func: (V, V) => V): Map[K, V] 576 577 该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。 578 579 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) 580 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[91] at makeRDD at :21 581 582 scala> rdd1.reduceByKeyLocally((x,y) => x + y) 583 res90: scala.collection.Map[String,Int] = Map(B -> 3, A -> 2, C -> 1) 584 585 29) cogroup和groupByKey的区别 586 587 val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2))) 588 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) 589 //cogroup 590 val rdd3 = rdd1.cogroup(rdd2) 591 //groupbykey 592 val rdd4 = rdd1.union(rdd2).groupByKey 593 //注意cogroup与groupByKey的区别 594 rdd3.foreach(println) 595 rdd4.foreach(println) 596 30) join 597 598 val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2))) 599 val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2))) 600 //求jion 601 val rdd3 = rdd1.join(rdd2) 602 rdd3.collect 603 604 31) leftOuterJoin 605 606 def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] 607 def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] 608 def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] 609 610 leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。 611 参数numPartitions用于指定结果的分区数 612 参数partitioner用于指定分区函数 613 614 var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) 615 var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) 616 617 scala> rdd1.leftOuterJoin(rdd2).collect 618 res11: Array[(String, (String, Option[String]))] = Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some(c)))) 619 620 32) rightOuterJoin 621 622 def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] 623 def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] 624 def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))] 625 626 rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。 627 参数numPartitions用于指定结果的分区数 628 参数partitioner用于指定分区函数 629 630 var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) 631 var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) 632 scala> rdd1.rightOuterJoin(rdd2).collect 633 res12: Array[(String, (Option[String], String))] = Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c))) 634 635 636 Action操作 637 33) first 638 639 def first(): T 640 641 first返回RDD中的第一个元素,不排序。 642 643 scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) 644 rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21 645 646 scala> rdd1.first 647 res14: (String, String) = (A,1) 648 649 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) 650 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21 651 652 scala> rdd1.first 653 res8: Int = 10 654 655 34) count 656 657 def count(): Long 658 659 count返回RDD中的元素数量。 660 661 scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) 662 rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21 663 664 scala> rdd1.count 665 res15: Long = 3 666 667 35) reduce 668 669 def reduce(f: (T, T) ⇒ T): T 670 671 根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。 672 673 scala> var rdd1 = sc.makeRDD(1 to 10,2) 674 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21 675 676 scala> rdd1.reduce(_ + _) 677 res18: Int = 55 678 679 scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) 680 rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21 681 682 scala> rdd2.reduce((x,y) => { 683 | (x._1 + y._1,x._2 + y._2) 684 | }) 685 res21: (String, Int) = (CBBAA,6) 686 687 collect 688 689 def collect(): Array[T] 690 691 collect用于将一个RDD转换成数组。 692 693 scala> var rdd1 = sc.makeRDD(1 to 10,2) 694 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21 695 696 scala> rdd1.collect 697 res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) 698 36) take 699 700 def take(num: Int): Array[T] 701 702 take用于获取RDD中从0到num-1下标的元素,不排序。 703 704 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) 705 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21 706 707 scala> rdd1.take(1) 708 res0: Array[Int] = Array(10) 709 710 scala> rdd1.take(2) 711 res1: Array[Int] = Array(10, 4) 712 713 37) top 714 715 def top(num: Int)(implicit ord: Ordering[T]): Array[T] 716 717 top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。 718 719 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) 720 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21 721 722 scala> rdd1.top(1) 723 res2: Array[Int] = Array(12) 724 725 scala> rdd1.top(2) 726 res3: Array[Int] = Array(12, 10) 727 728 //指定排序规则 729 scala> implicit val myOrd = implicitly[Ordering[Int]].reverse 730 myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499ef 731 732 scala> rdd1.top(1) 733 res4: Array[Int] = Array(2) 734 735 scala> rdd1.top(2) 736 res5: Array[Int] = Array(2, 3) 737 738 38) takeOrdered 739 740 def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] 741 742 takeOrdered和top类似,只不过以和top相反的顺序返回元素。 743 744 scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) 745 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21 746 747 scala> rdd1.top(1) 748 res4: Array[Int] = Array(12) 749 750 scala> rdd1.top(2) 751 res5: Array[Int] = Array(12, 10) 752 753 scala> rdd1.takeOrdered(1) 754 res6: Array[Int] = Array(2) 755 756 scala> rdd1.takeOrdered(2) 757 res7: Array[Int] = Array(2, 3) 758 759 39) aggregate 760 761 def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U 762 763 aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。 764 765 var rdd1 = sc.makeRDD(1 to 10,2) 766 rdd1.mapPartitionsWithIndex{ 767 (partIdx,iter) => { 768 var part_map = scala.collection.mutable.Map[String,List[Int]]() 769 while(iter.hasNext){ 770 var part_name = "part_" + partIdx; 771 var elem = iter.next() 772 if(part_map.contains(part_name)) { 773 var elems = part_map(part_name) 774 elems ::= elem 775 part_map(part_name) = elems 776 } else { 777 part_map(part_name) = List[Int]{elem} 778 } 779 } 780 part_map.iterator 781 782 } 783 }.collect 784 res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6))) 785 786 ##第一个分区中包含5,4,3,2,1 787 788 ##第二个分区中包含10,9,8,7,6 789 790 scala> rdd1.aggregate(1)( 791 | {(x : Int,y : Int) => x + y}, 792 | {(a : Int,b : Int) => a + b} 793 | ) 794 res17: Int = 58 795 796 结果为什么是58,看下面的计算过程: 797 798 ##先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1 799 800 ##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16 801 802 ## part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41 803 804 ##再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1 805 806 ##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58 807 808 再比如: 809 810 scala> rdd1.aggregate(2)( 811 | {(x : Int,y : Int) => x + y}, 812 | {(a : Int,b : Int) => a * b} 813 | ) 814 res18: Int = 1428 815 816 ##这次zeroValue=2 817 818 ##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17 819 820 ##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42 821 822 ##最后:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428 823 824 因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。 825 826 827 828 40) fold 829 830 def fold(zeroValue: T)(op: (T, T) ⇒ T): T 831 832 fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。 833 var rdd1 = sc.makeRDD(1 to 10, 2) 834 scala> rdd1.fold(1)( 835 | (x,y) => x + y 836 | ) 837 res19: Int = 58 838 839 ##结果同上面使用aggregate的第一个例子一样,即: 840 scala> rdd1.aggregate(1)( 841 | {(x,y) => x + y}, 842 | {(a,b) => a + b} 843 | ) 844 res20: Int = 58 845 846 41) lookup 847 848 def lookup(key: K): Seq[V] 849 850 lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。 851 852 853 854 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) 855 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21 856 857 scala> rdd1.lookup("A") 858 res0: Seq[Int] = WrappedArray(0, 2) 859 860 scala> rdd1.lookup("B") 861 res1: Seq[Int] = WrappedArray(1, 2) 862 42) countByKey 863 864 def countByKey(): Map[K, Long] 865 866 countByKey用于统计RDD[K,V]中每个K的数量。 867 868 scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3))) 869 rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21 870 871 scala> rdd1.countByKey 872 res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3) 873 874 43) foreach 875 876 def foreach(f: (T) ⇒ Unit): Unit 877 878 foreach用于遍历RDD,将函数f应用于每一个元素。 879 但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。 880 比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。 881 我在Spark1.4中是这样,不知道是否真如此。 882 这时候,使用accumulator共享变量与foreach结合,倒是个不错的选择。 883 884 scala> var cnt = sc.accumulator(0) 885 cnt: org.apache.spark.Accumulator[Int] = 0 886 887 scala> var rdd1 = sc.makeRDD(1 to 10,2) 888 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21 889 890 scala> rdd1.foreach(x => cnt += x) 891 892 scala> cnt.value 893 res51: Int = 55 894 895 scala> rdd1.collect.foreach(println) 896 44) foreachPartition 897 898 def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit 899 900 foreachPartition和foreach类似,只不过是对每一个分区使用f。 901 902 scala> var rdd1 = sc.makeRDD(1 to 10,2) 903 rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21 904 905 scala> var allsize = sc.accumulator(0) 906 size: org.apache.spark.Accumulator[Int] = 0 907 908 909 scala> rdd1.foreachPartition { x => { 910 | allsize += x.size 911 | }} 912 913 scala> println(allsize.value) 914 10 915 916 45) sortBy 917 918 def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 919 920 sortBy根据给定的排序k函数将RDD中的元素进行排序。 921 922 scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2) 923 924 scala> rdd1.sortBy(x => x).collect 925 res1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //默认升序 926 927 scala> rdd1.sortBy(x => x,false).collect 928 res2: Array[Int] = Array(7, 6, 3, 2, 1, 0) //降序 929 930 //RDD[K,V]类型 931 scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) 932 933 scala> rdd1.sortBy(x => x).collect 934 res3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7)) 935 936 //按照V进行降序排序 937 scala> rdd1.sortBy(x => x._2,false).collect 938 res4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1)) 939 46) saveAsTextFile 940 941 def saveAsTextFile(path: String): Unit 942 943 def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit 944 945 saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。 946 947 codec参数可以指定压缩的类名。 948 949 var rdd1 = sc.makeRDD(1 to 10,2) 950 scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFS 951 hadoop fs -ls /tmp/lxw1234.com 952 Found 2 items 953 -rw-r--r-- 2 lxw1234 supergroup 0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS 954 -rw-r--r-- 2 lxw1234 supergroup 21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000 955 956 hadoop fs -cat /tmp/lxw1234.com/part-00000 957 958 注意:如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)将文件保存到本地文件系统,那么只会保存在Executor所在机器的本地目录。 959 960 //指定压缩格式保存 961 962 rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/",classOf[com.hadoop.compression.lzo.LzopCodec]) 963 964 hadoop fs -ls /tmp/lxw1234.com 965 -rw-r--r-- 2 lxw1234 supergroup 0 2015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS 966 -rw-r--r-- 2 lxw1234 supergroup 71 2015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzo 967 968 hadoop fs -text /tmp/lxw1234.com/part-00000.lzo 969 970 971 47) saveAsSequenceFile 972 973 saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。 974 用法同saveAsTextFile。 975 976 48) saveAsObjectFile 977 978 def saveAsObjectFile(path: String): Unit 979 980 saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。 981 982 对于HDFS,默认采用SequenceFile保存。 983 984 var rdd1 = sc.makeRDD(1 to 10,2) 985 rdd1.saveAsObjectFile("hdfs://cdh5/tmp/lxw1234.com/") 986 987 hadoop fs -cat /tmp/lxw1234.com/part-00000 988 SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT 989 990 49) saveAsHadoopFile 991 992 def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit 993 def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = …, codec: Option[Class[_ <: CompressionCodec]] = None): Unit 994 995 saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API。 996 997 可以指定outputKeyClass、outputValueClass以及压缩格式。 998 每个分区输出一个文件。 999 var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) 1000 import org.apache.hadoop.mapred.TextOutputFormat 1001 import org.apache.hadoop.io.Text 1002 import org.apache.hadoop.io.IntWritable 1003 1004 rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]]) 1005 1006 rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]], 1007 classOf[com.hadoop.compression.lzo.LzopCodec]) 1008 1009 50) saveAsHadoopDataset 1010 1011 def saveAsHadoopDataset(conf: JobConf): Unit 1012 saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中,比如HBase。 1013 在JobConf中,通常需要关注或者设置五个参数: 1014 文件的保存路径、key值的class类型、value值的class类型、RDD的输出格式(OutputFormat)、以及压缩相关的参数。 1015 1016 ##使用saveAsHadoopDataset将RDD保存到HDFS中 1017 1018 import org.apache.spark.SparkConf 1019 import org.apache.spark.SparkContext 1020 import SparkContext._ 1021 import org.apache.hadoop.mapred.TextOutputFormat 1022 import org.apache.hadoop.io.Text 1023 import org.apache.hadoop.io.IntWritable 1024 import org.apache.hadoop.mapred.JobConf 1025 1026 1027 1028 var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) 1029 var jobConf = new JobConf() 1030 jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]]) 1031 jobConf.setOutputKeyClass(classOf[Text]) 1032 jobConf.setOutputValueClass(classOf[IntWritable]) 1033 jobConf.set("mapred.output.dir","/tmp/lxw1234/") 1034 rdd1.saveAsHadoopDataset(jobConf) 1035 1036 结果: 1037 hadoop fs -cat /tmp/lxw1234/part-00000 1038 A 2 1039 A 1 1040 hadoop fs -cat /tmp/lxw1234/part-00001 1041 B 6 1042 B 3 1043 B 7 1044 1045 ##保存数据到HBASE 1046 1047 HBase建表: 1048 1049 create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1} 1050 1051 import org.apache.spark.SparkConf 1052 import org.apache.spark.SparkContext 1053 import SparkContext._ 1054 import org.apache.hadoop.mapred.TextOutputFormat 1055 import org.apache.hadoop.io.Text 1056 import org.apache.hadoop.io.IntWritable 1057 import org.apache.hadoop.mapred.JobConf 1058 import org.apache.hadoop.hbase.HBaseConfiguration 1059 import org.apache.hadoop.hbase.mapred.TableOutputFormat 1060 import org.apache.hadoop.hbase.client.Put 1061 import org.apache.hadoop.hbase.util.Bytes 1062 import org.apache.hadoop.hbase.io.ImmutableBytesWritable 1063 1064 var conf = HBaseConfiguration.create() 1065 var jobConf = new JobConf(conf) 1066 jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3") 1067 jobConf.set("zookeeper.znode.parent","/hbase") 1068 jobConf.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234") 1069 jobConf.setOutputFormat(classOf[TableOutputFormat]) 1070 1071 var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7))) 1072 rdd1.map(x => 1073 { 1074 var put = new Put(Bytes.toBytes(x._1)) 1075 put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2)) 1076 (new ImmutableBytesWritable,put) 1077 } 1078 ).saveAsHadoopDataset(jobConf) 1079 1080 ##结果: 1081 hbase(main):005:0> scan ‘lxw1234‘ 1082 ROW COLUMN+CELL 1083 A column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02 1084 B column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06 1085 C column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07 1086 3 row(s) in 0.0550 seconds 1087 1088 注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。 1089 1090 51) saveAsNewAPIHadoopFile 1091 1092 def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit 1093 1094 def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit 1095 1096 1097 1098 saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。 1099 1100 用法基本同saveAsHadoopFile。 1101 1102 import org.apache.spark.SparkConf 1103 import org.apache.spark.SparkContext 1104 import SparkContext._ 1105 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat 1106 import org.apache.hadoop.io.Text 1107 import org.apache.hadoop.io.IntWritable 1108 1109 var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) 1110 rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]]) 1111 1112 52) saveAsNewAPIHadoopDataset 1113 1114 def saveAsNewAPIHadoopDataset(conf: Configuration): Unit 1115 1116 作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。 1117 1118 以写入HBase为例: 1119 1120 1121 1122 HBase建表: 1123 1124 create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1} 1125 1126 1127 1128 完整的Spark应用程序: 1129 1130 package com.lxw1234.test 1131 1132 import org.apache.spark.SparkConf 1133 import org.apache.spark.SparkContext 1134 import SparkContext._ 1135 import org.apache.hadoop.hbase.HBaseConfiguration 1136 import org.apache.hadoop.mapreduce.Job 1137 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat 1138 import org.apache.hadoop.hbase.io.ImmutableBytesWritable 1139 import org.apache.hadoop.hbase.client.Result 1140 import org.apache.hadoop.hbase.util.Bytes 1141 import org.apache.hadoop.hbase.client.Put 1142 1143 object Test { 1144 def main(args : Array[String]) { 1145 val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com") 1146 val sc = new SparkContext(sparkConf); 1147 var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7))) 1148 1149 sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3") 1150 sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase") 1151 sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234") 1152 var job = new Job(sc.hadoopConfiguration) 1153 job.setOutputKeyClass(classOf[ImmutableBytesWritable]) 1154 job.setOutputValueClass(classOf[Result]) 1155 job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) 1156 1157 rdd1.map( 1158 x => { 1159 var put = new Put(Bytes.toBytes(x._1)) 1160 put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2)) 1161 (new ImmutableBytesWritable,put) 1162 } 1163 ).saveAsNewAPIHadoopDataset(job.getConfiguration) 1164 1165 sc.stop() 1166 } 1167 } 1168 1169 注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。
原文:https://www.cnblogs.com/xjqi/p/12776841.html