每个RDD被分为多个分区,这些分区运行在集群的不同节点上。
用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序里分发驱动器程序中的对象集合(list和set)。
RDD支持两种类型的操作:转换操作和行动操作。惰性计算,只有行动操作才真正计算。
RDD持久化,RDD.persist()让Spark把这个RDD缓存在内存中,在之后的行动操作中,可以重用这些数据。也可以把RDD缓存到磁盘中。
lines = sc.parallelize(["asda","asdsa"])//创建RDD的parallelize方法,python
JavaRDD<String> lines = sc.parallelize(Arrays.asList("",""));//Java的parallelize方法
更常用的是从外部存储中读取数据创建RDD。
lines = sc.textFile("")//python
val lines = sc.textFile("")//scala
JavaRDD<String> lines = sc.textFile("ssa");//Java
python
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lamdda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)//两个RDD合并操作
Spark会使用lineage graph来继续RDD之间的依赖关系。
用take()来收集RDD的元素,提取badLinesRDD的10个元素。
for line in badLinesRDD.take(10):
print line
collect()函数获取整个RDD的数据
把数据写入HDFS和Amazon S3,可以使用saveAsTextFile()、saveAsSequenceFile()
在行动操作前,Spark不会开始计算。
在Python中,有三种方式把函数传递给Spark:
word = rdd.filter(lambda x: "error" in x)//lambda
def containsError(s):
return "error" in s
word = rdd.filter(containError)//局部函数
在Scala中,可以把定义的内联函数、方法的引用或静态方法传递给Spark。传递的函数及其引用的数据需要是可序列化的。
在Java中,传递org.apache.spark.api.java.function中的函数接口对象。
map(),接收一个函数,把这个函数用于RDD中的每个元素,把函数的韩慧结果作为RDD元素对应的值。返回值的类型和输入值的类型不需要一样。
filter(),接收一个函数,把RDD中满足该函数的元素放入新的RDD中返回。
flatMap(),返回一个包含各个迭代器可访问的所有元素的RDD。(map返回一行一行,flatMap返回一个词一个词)
lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first()//返回hello
RDD.distinct()生成一个只包含不同元素的新RDD,开销很大,需要shuffle。
union()两个RDD合并成一个;intersection()只返回两个RDD都有的元素;subtract(other)返回只存在第一个RDD不存在第二个RDD中所有元素组成的RDD,也要shuffle;cartesian()笛卡尔积
reduce(),接收一个函数作为参数。操作两个RDD的元素类型的数据并返回一个同样类型的新元素。
sum = rdd.reduce(lambda x, y : x + y)
fold(),aggregate()
collect(),把数据返回驱动器,即把RDD内容返回。
take(n)返回RDD的n个元素
foreach()对RDD中的每个元素进行操作
避免多次计算同一个RDD,可以对数据进行持久化,计算出RDD的节点会分别保存它们分区数据,在scala和Java中,persist()默认把数据缓存在JVM堆空间。
unpersist()持久化RDD从缓存中移除
原文:https://www.cnblogs.com/chenshaowei/p/12402250.html