首页 > 其他 > 详细

RDD编程

时间:2020-03-03 15:31:48      阅读:54      评论:0      收藏:0      [点我收藏+]

RDD创建

每个RDD被分为多个分区,这些分区运行在集群的不同节点上。

用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序里分发驱动器程序中的对象集合(list和set)。

RDD支持两种类型的操作:转换操作和行动操作。惰性计算,只有行动操作才真正计算。

RDD持久化,RDD.persist()让Spark把这个RDD缓存在内存中,在之后的行动操作中,可以重用这些数据。也可以把RDD缓存到磁盘中。

lines = sc.parallelize(["asda","asdsa"])//创建RDD的parallelize方法,python
JavaRDD<String> lines = sc.parallelize(Arrays.asList("",""));//Java的parallelize方法

更常用的是从外部存储中读取数据创建RDD。

lines = sc.textFile("")//python
val lines = sc.textFile("")//scala
JavaRDD<String> lines = sc.textFile("ssa");//Java

RDD操作

python

errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lamdda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)//两个RDD合并操作

Spark会使用lineage graph来继续RDD之间的依赖关系。

用take()来收集RDD的元素,提取badLinesRDD的10个元素。

for line in badLinesRDD.take(10):
    print line

collect()函数获取整个RDD的数据

把数据写入HDFS和Amazon S3,可以使用saveAsTextFile()、saveAsSequenceFile()

惰性求值

在行动操作前,Spark不会开始计算。

在Python中,有三种方式把函数传递给Spark:

  1. lambda表达式
  2. 顶层函数
  3. 定义的局部函数
word = rdd.filter(lambda x: "error" in x)//lambda

def containsError(s):
    return "error" in s
word = rdd.filter(containError)//局部函数

在Scala中,可以把定义的内联函数、方法的引用或静态方法传递给Spark。传递的函数及其引用的数据需要是可序列化的。

在Java中,传递org.apache.spark.api.java.function中的函数接口对象。

map(),接收一个函数,把这个函数用于RDD中的每个元素,把函数的韩慧结果作为RDD元素对应的值。返回值的类型和输入值的类型不需要一样。

filter(),接收一个函数,把RDD中满足该函数的元素放入新的RDD中返回。

flatMap(),返回一个包含各个迭代器可访问的所有元素的RDD。(map返回一行一行,flatMap返回一个词一个词)

lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first()//返回hello

RDD.distinct()生成一个只包含不同元素的新RDD,开销很大,需要shuffle。

union()两个RDD合并成一个;intersection()只返回两个RDD都有的元素;subtract(other)返回只存在第一个RDD不存在第二个RDD中所有元素组成的RDD,也要shuffle;cartesian()笛卡尔积

reduce(),接收一个函数作为参数。操作两个RDD的元素类型的数据并返回一个同样类型的新元素。

sum = rdd.reduce(lambda x, y : x + y)

fold(),aggregate()

collect(),把数据返回驱动器,即把RDD内容返回。

take(n)返回RDD的n个元素

foreach()对RDD中的每个元素进行操作

避免多次计算同一个RDD,可以对数据进行持久化,计算出RDD的节点会分别保存它们分区数据,在scala和Java中,persist()默认把数据缓存在JVM堆空间。

如果缓存数据太多,内存放不下,Spark会用LRU策略把最老的分区从内存中移除。写入磁盘。

unpersist()持久化RDD从缓存中移除

RDD编程

原文:https://www.cnblogs.com/chenshaowei/p/12402250.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!