SparkContext是spark的入口,通过它来连接集群、创建RDD、广播变量等等。
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { private val creationSite: CallSite = Utils.getCallSite() //如果生命了2个sparkContext,则会使用warn来取代exception.防止退出 private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false) ..防止两个sparkcontext同时运行 SparkContext.markPartiallyConstructed(this, allowMultipleContexts) private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map() val startTime = System.currentTimeMillis() //当提交任务执行spark-submit时,加载系统环境变量 def this() = this(new SparkConf()) def this(master: String, appName: String, conf: SparkConf) = this(SparkContext.updatedConf(conf, master, appName)) //preferredNodeLocationData 用于启动查找nodes,启动相应的container def this( master: String, appName: String, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map(), preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) = { this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment)) if (preferredNodeLocationData.nonEmpty) { logWarning("Passing in preferred locations has no effect at all, see SPARK-8949") } this.preferredNodeLocationData = preferredNodeLocationData //构造函数 private[spark] def this(master: String, appName: String) = this(master, appName, null, Nil, Map(), Map()) private[spark] def this(master: String, appName: String, sparkHome: String) = this(master, appName, sparkHome, Nil, Map(), Map()) private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) = this(master, appName, sparkHome, jars, Map(), Map()) private[spark] def conf: SparkConf = _conf //clone Conf,那么在运行时就不能被修改 def getConf: SparkConf = conf.clone() def jars: Seq[String] = _jars def files: Seq[String] = _files def master: String = _conf.get("spark.master") def appName: String = _conf.get("spark.app.name") private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false) private[spark] def eventLogDir: Option[URI] = _eventLogDir private[spark] def eventLogCodec: Option[String] = _eventLogCodec //创建schedular val (sched, ts) = SparkContext.createTaskScheduler(this, master) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) //启动taskschedular _taskScheduler.start() applicationId = _taskScheduler.applicationId() _applicationAttemptId = taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) _env.blockManager.initialize(_applicationId) //创建一个新的RDD,通过step来增加元素 def range( start: Long, end: Long, step: Long = 1, numSlices: Int = defaultParallelism): RDD[Long] = withScope { assertNotStopped() // when step is 0, range will run infinitely require(step != 0, "step cannot be 0") val numElements: BigInt = { val safeStart = BigInt(start) val safeEnd = BigInt(end) if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) { (safeEnd - safeStart) / step } else { (safeEnd - safeStart) / step + 1 } } parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex((i, _) => { val partitionStart = (i * numElements) / numSlices * step + start val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start def getSafeMargin(bi: BigInt): Long = if (bi.isValidLong) { bi.toLong } else if (bi > 0) { Long.MaxValue } else { Long.MinValue } val safePartitionStart = getSafeMargin(partitionStart) val safePartitionEnd = getSafeMargin(partitionEnd) new Iterator[Long] { private[this] var number: Long = safePartitionStart private[this] var overflow: Boolean = false override def hasNext = if (!overflow) { if (step > 0) { number < safePartitionEnd } else { number > safePartitionEnd } } else false override def next() = { val ret = number number += step if (number < ret ^ step < 0) { overflow = true } ret } } }) } //创建一个RDD def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) } //读取本地、HDFS的文件,返回一个String的字符串 def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString) } //加载一个二进制文件, @Experimental def binaryRecords( path: String, recordLength: Int, conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = withScope { assertNotStopped() conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path, classOf[FixedLengthBinaryInputFormat], classOf[LongWritable], classOf[BytesWritable], conf = conf) val data = br.map { case (k, v) => val bytes = v.getBytes assert(bytes.length == recordLength, "Byte array does not have correct length") bytes } data } //获得一个为HADOOP sequenceFile给定键值对类型的RDD def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int ): RDD[(K, V)] = withScope { assertNotStopped() val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions) } //1300发送一个广播变量到集群的每个节点 def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) { logWarning("Can not directly broadcast RDDs; instead, call collect() and " + "broadcast the result (see SPARK-5063)") } val bc = env.broadcastManager.newBroadcast[T](value, isLocal) val callSite = getCallSite logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc }
原文:http://www.cnblogs.com/yangsy0915/p/5467389.html