RDD是Resilient Distributed Dataset
的英文缩写,是spark的基本数据抽象,代表着一个不可变的、多分区的、可并行操作的元素集合。
RDD有5个主要属性:
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
def compute(split: Partition, context: TaskContext): Iterator[T]
protected def getPartitions: Array[Partition]
protected def getDependencies: Seq[Dependency[_]] = deps
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
@transient val partitioner: Option[Partitioner] = None
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
可见血统关系是通过deps
依赖列表来保存的,如果不指定依赖列表则默认创建一对一的依赖关系OneToOneDependency
RDD类中定义了一些通用的转换函数如map``fliter``union
等同时RDD的伴生对象中通过隐式转换的方式定义了一些额外的转换函数,比如kv类型的RDD一些转换函数:groupByKey
cogroup
等
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}
Dependency
抽象类来描述依赖关系,有两种子类:
def getParents(partitionId: Int): Seq[Int]
函数计算出子RDD的某个分区依赖的父RDD的分区。 override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context)
ParallelCollectionPartition
对象保存每个分区的数据。 override def getPartitions: Array[Partition] = {
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
override def compute(s: Partition, context: TaskContext): Iterator[T] = {
new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
}
override def getPreferredLocations(s: Partition): Seq[String] = {
locationPrefs.getOrElse(s.index, Nil)
}
原文:https://www.cnblogs.com/andyhe/p/10827393.html