首页 > 其他 > 详细

5.Spark Streaming流计算框架的运行流程源码分析

时间:2016-05-15 02:47:20      阅读:192      评论:0      收藏:0      [点我收藏+]
1 spark streaming 程序代码实例
代码如下:
  1. object?OnlineTheTop3ItemForEachCategory2DB?{??
  2. ??def?main(args:?Array[String]){ ??
  3. ????val?conf?=?new?SparkConf()?//创建SparkConf对象??
  4. ????//设置应用程序的名称,在程序运行的监控界面可以看到名称??
  5. ????conf.setAppName("OnlineTheTop3ItemForEachCategory2DB")??
  6. ????conf.setMaster("spark://Master:7077")?//此时,程序在Spark集群??
  7. ????//设置batchDuration时间间隔来控制Job生成的频率并且创建Spark?Streaming执行的入口??
  8. ????val?ssc?=?new?StreamingContext(conf,?Seconds(5))??
  9. ????ssc.checkpoint("/root/Documents/SparkApps/checkpoint")??
  10. ????val soketDStream?=?ssc.socketTextStream("Master",?9999)??
  11. ?? ?/// 业务处理逻辑 .....
  12. ?? ???
  13. ????ssc.start()??
  14. ????ssc.awaitTermination()??
  15. ??}??
  16. }??
?
2 Spark Streaming的运行源码分析

2.1 创建StreamingContext

 
我们将基于以上实例例,粗略地分析一下Spark源码,提示一些有针对性的内容,以了解其运行的主要流程。
1)代码没有直接使用SparkContext,而是使用StreamingContext。
我们来看看StreamingContext?的源码片段:
  1. /**??
  2. ?*?Create?a?StreamingContext?by?providing?the?configuration?necessary?for?a?new?SparkContext.??
  3. ?*?@param?conf?a?org.apache.spark.SparkConf?object?specifying?Spark?parameters??
  4. ?*?@param?batchDuration?the?time?interval?at?which?streaming?data?will?be?divided?into?batches??
  5. ?*/??
  6. def?this(conf:?SparkConf,?batchDuration:?Duration)?=?{??
  7. ??this(StreamingContext.createNewSparkContext(conf),?null,?batchDuration)??
  8. }??
没错,createNewSparkContext就是创建SparkContext:
  1. private[streaming]?def?createNewSparkContext(conf:?SparkConf):?SparkContext?=?{???
  2. ??new?SparkContext(conf)???
  3. }??
?这说明Spark Streaming也是Spark上的一个应用程序。

?2)案例最开始的地方肯定要通过数据流创建一个InputDStream。

  1. val?socketDstram?=?ssc.socketTextStream("Master",?9999)??
socketTextStream方法定义如下:
  1. /**??
  2. ?*?Create?a?input?stream?from?TCP?source?hostname:port.?Data?is?received?using??
  3. ?*?a?TCP?socket?and?the?receive?bytes?is?interpreted?as?UTF8?encoded?`\n`?delimited??
  4. ?*?lines.??
  5. ?*?@param?hostname??????Hostname?to?connect?to?for?receiving?data??
  6. ?*?@param?port??????????Port?to?connect?to?for?receiving?data??
  7. ?*?@param?storageLevel??Storage?level?to?use?for?storing?the?received?objects??
  8. ?*??????????????????????(default:?StorageLevel.MEMORY_AND_DISK_SER_2)??
  9. ?*/??
  10. def?socketTextStream(??
  11. ????hostname:?String,??
  12. ????port:?Int,??
  13. ????storageLevel:?StorageLevel?=?StorageLevel.MEMORY_AND_DISK_SER_2??
  14. ??):?ReceiverInputDStream[String]?=?withNamedScope("socket?text?stream")?{??
  15. ??socketStream[String](hostname,?port,?SocketReceiver.bytesToLines,?storageLevel)??
  16. }??
3)可看到代码最后面调用socketStream。
socketStream定义如下:
  1. /**??
  2. ?*?Create?a?input?stream?from?TCP?source?hostname:port.?Data?is?received?using??
  3. ?*?a?TCP?socket?and?the?receive?bytes?it?interepreted?as?object?using?the?given??
  4. ?*?converter.??
  5. ?*?@param?hostname??????Hostname?to?connect?to?for?receiving?data??
  6. ?*?@param?port??????????Port?to?connect?to?for?receiving?data??
  7. ?*?@param?converter?????Function?to?convert?the?byte?stream?to?objects??
  8. ?*?@param?storageLevel??Storage?level?to?use?for?storing?the?received?objects??
  9. ?*?@tparam?T????????????Type?of?the?objects?received?(after?converting?bytes?to?objects)??
  10. ?*/??
  11. def?socketStream[T:?ClassTag](??
  12. ????hostname:?String,??
  13. ????port:?Int,??
  14. ????converter:?(InputStream)?=>?Iterator[T],??
  15. ????storageLevel:?StorageLevel??
  16. ??):?ReceiverInputDStream[T]?=?{??
  17. ??new?SocketInputDStream[T](this,?hostname,?port,?converter,?storageLevel)??
  18. }??
4)实际上生成SocketInputDStream。
SocketInputDStream类如下:
  1. private[streaming]??
  2. class?SocketInputDStream[T:?ClassTag](??
  3. ????ssc_?:?StreamingContext,??
  4. ????host:?String,??
  5. ????port:?Int,??
  6. ????bytesToObjects:?InputStream?=>?Iterator[T],??
  7. ????storageLevel:?StorageLevel??
  8. ??)?extends?ReceiverInputDStream[T](ssc_)?{??
  9. ??def?getReceiver():?Receiver[T]?=?{??
  10. ????new?SocketReceiver(host,?port,?bytesToObjects,?storageLevel)??
  11. ??}??
  12. }??
SocketInputDStream继承ReceiverInputDStream。
其中实现getReceiver方法,返回SocketReceiver对象。
总结一下SocketInputDStream的继承关系:
SocketInputDStream -> ReceiverInputDStream -> InputDStream -> DStream。  
?
5)DStream是生成RDD的模板,是逻辑级别,当达到Interval的时候这些模板会被batch data实例化成为RDD和DAG。
DStream的generatedRDDs:
  1. //?RDDs?generated,?marked?as?private[streaming]?so?that?testsuites?can?access?it??
  2. @transient??
  3. private[streaming]?var?generatedRDDs?=?new?HashMap[Time,?RDD[T]]?()??
DStream的getOrCompute:
  1. /**??
  2. ?*?Get?the?RDD?corresponding?to?the?given?time;?either?retrieve?it?from?cache??
  3. ?*?or?compute-and-cache?it.??
  4. ?*/??
  5. private[streaming]?final?def?getOrCompute(time:?Time):?Option[RDD[T]]?=?{??
  6. ??//?If?RDD?was?already?generated,?then?retrieve?it?from?HashMap,??
  7. ??//?or?else?compute?the?RDD??
  8. ??generatedRDDs.get(time).orElse?{??
  9. ????//?Compute?the?RDD?if?time?is?valid?(e.g.?correct?time?in?a?sliding?window)??
  10. ????//?of?RDD?generation,?else?generate?nothing.??
  11. ????if?(isTimeValid(time))?{??
  12. ??????val?rddOption?=?createRDDWithLocalProperties(time,?displayInnerRDDOps?=?false)?{??
  13. ????????//?Disable?checks?for?existing?output?directories?in?jobs?launched?by?the?streaming??
  14. ????????//?scheduler,?since?we?may?need?to?write?output?to?an?existing?directory?during?checkpoint??
  15. ????????//?recovery;?see?SPARK-4835?for?more?details.?We?need?to?have?this?call?here?because??
  16. ????????//?compute()?might?cause?Spark?jobs?to?be?launched.??
  17. ????????PairRDDFunctions.disableOutputSpecValidation.withValue(true)?{??
  18. ??????????compute(time)??
  19. ????????}??
  20. ??????}??
  21. ??????rddOption.foreach?{?case?newRDD?=>??
  22. ????????//?Register?the?generated?RDD?for?caching?and?checkpointing??
  23. ????????if?(storageLevel?!=?StorageLevel.NONE)?{??
  24. ??????????newRDD.persist(storageLevel)??
  25. ??????????logDebug(s"Persisting?RDD?${newRDD.id}?for?time?$time?to?$storageLevel")??
  26. ????????}??
  27. ????????if?(checkpointDuration?!=?null?&&?(time?-?zeroTime).isMultipleOf(checkpointDuration))?{??
  28. ??????????newRDD.checkpoint()??
  29. ??????????logInfo(s"Marking?RDD?${newRDD.id}?for?time?$time?for?checkpointing")??
  30. ????????}??
  31. ????????generatedRDDs.put(time,?newRDD)??
  32. ??????}??
  33. ??????rddOption??
  34. ????}?else?{??
  35. ??????None??
  36. ????}??
  37. ??}??
  38. }??
主要是生成RDD,再将生成的RDD放在HashMap中。具体生成RDD过程以后剖析。
目前大致讲了DStream和RDD这些核心概念在Spark Streaming中的使用。
?

2.2 启动StreamingContext

代码实例中的ssc.start() 方法启动StreamingContext,主要的逻辑发生在这个start方法中:

?

? ? ? * ?在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,

? ? ? * ?在JobScheduler的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和

? ? ? * ?ReceiverTacker的start方法:

? ? ? *

? ? ? * ?1,JobGenerator启动后会不断的根据batchDuration生成一个个的Job

?

? ? ? * ?其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG

? ? ? * ?而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,

? ? ? * ?在JobScheduler中通过线程池的方式找到一个单独的线程来提交Job到集群运行(其实是在线程中

? ? ? * ?基于RDD的Action触发真正的作业的运行)

? ??? *

?

? ? ? * ?2,ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动

? ? ? * ?ReceiverSupervisor),在Receiver收到数据后会通过ReceiverSupervisor存储到Executor并且把

? ? ? * ?数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会通过

? ? ? * ?ReceivedBlockTracker来管理接受到的元数据信息.

?

?

体现Spark Streaming应用运行流程的关键类如下图所示。
?
bubuko.com,布布扣
?
?
下面开启神奇的 源码分析之旅,过程痛苦,痛苦之后是大彻大悟的畅快...........
?
?
1)先看看ScreamingContext的start()。
start()方法启动StreamContext,由于Spark应用程序不能有多个SparkContext对象实例,所以Spark?Streaming框架在启动时对状态进行判断。代码如下:
  1. /**??
  2. ?*?Start?the?execution?of?the?streams.??
  3. ?*??
  4. ?*?@throws?IllegalStateException?if?the?StreamingContext?is?already?stopped.??
  5. ?*/??
  6. def?start():?Unit?=?synchronized?{??
  7. ??state?match?{??
  8. ????case?INITIALIZED?=>??
  9. ??????startSite.set(DStream.getCreationSite())??
  10. ??????StreamingContext.ACTIVATION_LOCK.synchronized?{??
  11. ????????StreamingContext.assertNoOtherContextIsActive()??
  12. ????????try?{??
  13. ??????????validate()??
  14. ??????????//?Start?the?streaming?scheduler?in?a?new?thread,?so?that?thread?local?properties??
  15. ??????????//?like?call?sites?and?job?groups?can?be?reset?without?affecting?those?of?the??
  16. ??????????//?current?thread.??
  17. ? ? ? ? ??//线程本地存储,线程有自己的私有属性,设置这些线程的时候不会影响其他线程,
  18. ? ??? ??ThreadUtils.runInNewThread("streaming-start")?{??
  19. ????????????sparkContext.setCallSite(startSite.get)??
  20. ????????????sparkContext.clearJobGroup()??
  21. ????????????sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,?"false")??
  22. ????????????//启动JobScheduler??
  23. ????????????scheduler.start()??
  24. ??????????}??
  25. ??????????state?=?StreamingContextState.ACTIVE??
  26. ????????}?catch?{??
  27. ??????????case?NonFatal(e)?=>??
  28. ????????????logError("Error?starting?the?context,?marking?it?as?stopped",?e)??
  29. ????????????scheduler.stop(false)??
  30. ????????????state?=?StreamingContextState.STOPPED??
  31. ????????????throw?e??
  32. ????????}??
  33. ????????StreamingContext.setActiveContext(this)??
  34. ??????}??
  35. ??????shutdownHookRef?=?ShutdownHookManager.addShutdownHook(??
  36. ????????StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)??
  37. ??????//?Registering?Streaming?Metrics?at?the?start?of?the?StreamingContext??
  38. ??????assert(env.metricsSystem?!=?null)??
  39. ??????env.metricsSystem.registerSource(streamingSource)??
  40. ??????uiTab.foreach(_.attach())??
  41. ??????logInfo("StreamingContext?started")??
  42. ????case?ACTIVE?=>??
  43. ??????logWarning("StreamingContext?has?already?been?started")??
  44. ????case?STOPPED?=>??
  45. ??????throw?new?IllegalStateException("StreamingContext?has?already?been?stopped")??
  46. ??}??
  47. }??
初始状态时,会启动JobScheduler。
?
2)接着来看下JobScheduler的启动过程start()。
其中启动了EventLoop、StreamListenerBus、ReceiverTracker和jobGenerator等多项工作。
  1. def?start():?Unit?=?synchronized?{??
  2. ??if?(eventLoop?!=?null)?return?//?scheduler?has?already?been?started??
  3. ????logDebug("Starting?JobScheduler")??
  4. ????eventLoop?=?new?EventLoop[JobSchedulerEvent]("JobScheduler")?{??
  5. ????override?protected?def?onReceive(event:?JobSchedulerEvent):?Unit?=?processEvent(event)??
  6. ????override?protected?def?onError(e:?Throwable):?Unit?=?reportError("Error?in?job?scheduler",?e)??
  7. ??}??
  8. ??//?启动消息循环处理线程。用于处理JobScheduler的各种事件。??
  9. ??eventLoop.start()??
  10. ??//?attach?rate?controllers?of?input?streams?to?receive?batch?completion?updates??
  11. ??for?{??
  12. ????inputDStream?<-?ssc.graph.getInputStreams??
  13. // rateController可以控制输入速度
  14. ????rateController?<-?inputDStream.rateController??
  15. ??}?ssc.addStreamingListener(rateController)??
  16. ??//?启动监听器。用于更新Spark?UI中StreamTab的内容。??
  17. ??listenerBus.start(ssc.sparkContext)??
  18. ??receiverTracker?=?new?ReceiverTracker(ssc)??
  19. ??//?生成InputInfoTracker。用于管理所有的输入的流,以及他们输入的数据统计。这些信息将通过?StreamingListener监听。??
  20. ??inputInfoTracker?=?new?InputInfoTracker(ssc)??
  21. ??//?启动ReceiverTracker。用于处理数据接收、数据缓存、Block生成。??
  22. ??receiverTracker.start()??
  23. ??//?启动JobGenerator。用于DStreamGraph初始化、DStream与RDD的转换、生成Job、提交执行等工作。??
  24. ??jobGenerator.start()??
  25. ??logInfo("Started?JobScheduler")??
  26. }??
3)JobScheduler中的消息处理函数processEvent
处理三类消息:Job已开始,Job已完成,错误报告。
  1. private?def?processEvent(event:?JobSchedulerEvent)?{??
  2. ??try?{??
  3. ????event?match?{??
  4. ??????case?JobStarted(job,?startTime)?=>?handleJobStart(job,?startTime)??
  5. ??????case?JobCompleted(job,?completedTime)?=>?handleJobCompletion(job,?completedTime)??
  6. ??????case?ErrorReported(m,?e)?=>?handleError(m,?e)??
  7. ????}??
  8. ??}?catch?{??
  9. ????case?e:?Throwable?=>??
  10. ??????reportError("Error?in?job?scheduler",?e)??
  11. ??}??
  12. }
4)我们再粗略地分析一下JobScheduler.start()中启动的工作。
4.1)先看JobScheduler.start()启动的第一项工作EventLoop。
EventLoop用于处理JobScheduler的各种事件。
EventLoop中有事件队列:
  1. private?val?eventQueue:?BlockingQueue[E]?=?new?LinkedBlockingDeque[E]()??
还有一个线程处理队列中的事件:
  1. private?val?eventThread?=?new?Thread(name)?{??
  2. ??setDaemon(true)??
  3. ??override?def?run():?Unit?=?{??
  4. ????try?{??
  5. ??????while?(!stopped.get)?{??
  6. ????????val?event?=?eventQueue.take()??
  7. ????????try?{??
  8. ??????????onReceive(event)??
  9. ????????}?catch?{??
  10. ??????????case?NonFatal(e)?=>?{??
  11. ????????????try?{??
  12. ??????????????onError(e)??
  13. ????????????}?catch?{??
  14. ??????????????case?NonFatal(e)?=>?logError("Unexpected?error?in?"?+?name,?e)??
  15. ????????????}??
  16. ??????????}??
  17. ????????}??
  18. ??????}??
  19. ????}?catch?{??
  20. ??????case?ie:?InterruptedException?=>?//?exit?even?if?eventQueue?is?not?empty??
  21. ??????case?NonFatal(e)?=>?logError("Unexpected?error?in?"?+?name,?e)??
  22. ????}??
  23. ??}??
  24. }??
这个线程中的onReceive、onError,在JobScheduler中的EventLoop实例化时已定义。
4.2)JobScheduler.start()启动的第二项工作StreamListenerBus。
- 用于异步传递StreamingListenerEvents到注册的StreamingListeners。
- 用于更新Spark UI中StreamTab的内容。
?
?
4.3)看JobScheduler.start()启动的第三项工作ReceiverTracker。
ReceiverTracker用于管理所有的输入的流,以及他们输入的数据统计。
这些信息将通过?StreamingListener监听。
ReceiverTracker的start()中,会内部实例化ReceiverTrackerEndpoint这个Rpc消息通信体。
?
bubuko.com,布布扣
 1 def start(): Unit = synchronized {
 2   if (isTrackerStarted) {
 3     throw new SparkException("ReceiverTracker already started")
 4   }
 5  
 6   if (!receiverInputStreams.isEmpty) {
 7     endpoint = ssc.env.rpcEnv.setupEndpoint(
 8       "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
 9     if (!skipReceiverLaunch) launchReceivers()
10     logInfo("ReceiverTracker started")
11     trackerState = Started
12   }
13 }
bubuko.com,布布扣

?

?
在ReceiverTracker启动的过程中会调用其launchReceivers方法:
?
bubuko.com,布布扣
/**
 * Get the receivers from the ReceiverInputDStreams, distributes them to the
 * worker nodes as a parallel collection, and runs them.
 */
private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map(nis => {
    val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.id)
    rcvr
  })
  runDummySparkJob()
  logInfo("Starting " + receivers.length + " receivers")
  endpoint.send(StartAllReceivers(receivers))
}
 
bubuko.com,布布扣

?

其中调用了runDummySparkJob方法来启动Spark?Streaming的框架第一个Job,其中collect这个action操作会触发Spark?Job的执行。这个方法是为了确保每个Slave都注册上,避免所有Receiver都在一个节点,使后面的计算能负载均衡。
?
bubuko.com,布布扣
/**
 * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
 * receivers to be scheduled on the same node.
 *
 * TODO Should poll the executor number and wait for executors according to
 * "spark.scheduler.minRegisteredResourcesRatio" and
 * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
 */
private def runDummySparkJob(): Unit = {
  if (!ssc.sparkContext.isLocal) {
    ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
  }
  assert(getExecutors.nonEmpty)
}
 
bubuko.com,布布扣

?

ReceiverTracker.launchReceivers()还调用了endpoint.send(StartAllReceivers(receivers))方法,Rpc消息通信体发送StartAllReceivers消息。
ReceiverTrackerEndpoint它自己接收到消息后,先根据调度策略获得Recevier在哪个Executor上运行,然后在调用startReceiver(receiver, executors)方法,来启动Receiver。
bubuko.com,布布扣
override def receive: PartialFunction[Any, Unit] = {
  // Local messages
  case StartAllReceivers(receivers) =>
    val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
    for (receiver <- receivers) {
      val executors = scheduledLocations(receiver.streamId)
      updateReceiverScheduledExecutors(receiver.streamId, executors)
      receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
      startReceiver(receiver, executors)
    }
 
bubuko.com,布布扣

?

在startReceiver方法中,ssc.sparkContext.submitJob提交Job的时候传入startReceiverFunc这个方法,因为startReceiverFunc该方法是在Executor上执行的。而在startReceiverFunc方法中实例化ReceiverSupervisorImpl对象,该对象是对Receiver进行管理和监控。这个Job是Spark?Streaming框架为我们启动的第二个Job,且一直运行。因为supervisor.awaitTermination()该方法会阻塞等待退出
?
bubuko.com,布布扣
/**
 * Start a receiver along with its scheduled executors
 */
private def startReceiver(
    receiver: Receiver[_],
    scheduledLocations: Seq[TaskLocation]): Unit = {
  def shouldStartReceiver: Boolean = {
 
    // ........... 此处省略1万字 (无关代码) , 呵呵哒 .........
 
  // Function to start the receiver on the worker node
  val startReceiverFunc: Iterator[Receiver[_]] => Unit =
    (iterator: Iterator[Receiver[_]]) => {
      if (!iterator.hasNext) {
        throw new SparkException(
          "Could not start receiver as object not found.")
      }
      if (TaskContext.get().attemptNumber() == 0) {
        val receiver = iterator.next()
        assert(iterator.hasNext == false)
        //实例化Receiver监控者
        val supervisor = new ReceiverSupervisorImpl(
          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
        supervisor.start()
        supervisor.awaitTermination()
      } else {
        // It‘s restarted by TaskScheduler, but we want to reschedule it again. So exit it.
      }
    }
 
  // Create the RDD using the scheduledLocations to run the receiver in a Spark job
  val receiverRDD: RDD[Receiver[_]] =
    if (scheduledLocations.isEmpty) {
      ssc.sc.makeRDD(Seq(receiver), 1)
    } else {
      val preferredLocations = scheduledLocations.map(_.toString).distinct
      ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
    }
 
  receiverRDD.setName(s"Receiver $receiverId")
  ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
  ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
  val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
    receiverRDD, 
   startReceiverFunc, //提交Job时候传入startReceiverFunc这个方法,因为startReceiverFunc该方法是在Executor上执行的
  Seq(0), (_, _) => Unit, ())
 
  // 一直重启 receiver job直到 ReceiverTracker is stopped
  future.onComplete {
    case Success(_) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
    case Failure(e) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logError("Receiver has been stopped. Try to restart it.", e)
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
  }(submitJobThreadPool)
  logInfo(s"Receiver ${receiver.streamId} started")
}
 
bubuko.com,布布扣

?

接下来看下ReceiverSupervisorImpl的启动过程,先启动所有注册上的BlockGenerator对象,然后向ReceiverTrackerEndpoint发送RegisterReceiver消息,再调用receiver的onStart方法。
?
/** Start the supervisor */
def start() {
  onStart()
  startReceiver()
}

?

?
其中的onStart():启动所有注册上的BlockGenerator对象
override protected def onStart() {
  registeredBlockGenerators.foreach { _.start() }
}

?

?
其中的startReceiver()方法中调用onReceiverStart()然后再调用receiver的onStart方法。
?
bubuko.com,布布扣
/** Start receiver */
def startReceiver(): Unit = synchronized {
  try {
    if (onReceiverStart()) {
      logInfo("Starting receiver")
      receiverState = Started
      receiver.onStart()
      logInfo("Called receiver onStart")
    } else {
      // The driver refused us
      stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
    }
  } catch {
    case NonFatal(t) =>
      stop("Error starting receiver " + streamId, Some(t))
  }
}
bubuko.com,布布扣

?

?
在onReceiverStart()中向ReceiverTrackerEndpoint发送RegisterReceiver消息
?
override protected def onReceiverStart(): Boolean = {
  val msg = RegisterReceiver(
    streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
  trackerEndpoint.askWithRetry[Boolean](msg)
}

?

?
其中在Driver运行的ReceiverTrackerEndpoint对象接收到RegisterReceiver消息后,将streamId,?typ,?host,?executorId,?receiverEndpoint封装为ReceiverTrackingInfo保存到内存对象receiverTrackingInfos这个HashMap中。
?
bubuko.com,布布扣
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  // Remote messages
  case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
    val successful =
      registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
    context.reply(successful)
 
bubuko.com,布布扣

?

?
registerReceiver方法源码:
bubuko.com,布布扣
/** Register a receiver */
private def registerReceiver(
    streamId: Int,
    typ: String,
    host: String,
    executorId: String,
    receiverEndpoint: RpcEndpointRef,
    senderAddress: RpcAddress
  ): Boolean = {
  if (!receiverInputStreamIds.contains(streamId)) {
    throw new SparkException("Register received for unexpected id " + streamId)
  }
 
    // ........... 此处省略1万字 (无关代码) , 呵呵哒 .........
 
  if (!isAcceptable) {
    // Refuse it since it‘s scheduled to a wrong executor
    false
  } else {
    val name = s"${typ}-${streamId}"
    val receiverTrackingInfo = ReceiverTrackingInfo(
      streamId,
      ReceiverState.ACTIVE,
      scheduledLocations = None,
      runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
      name = Some(name),
      endpoint = Some(receiverEndpoint))
    receiverTrackingInfos.put(streamId, receiverTrackingInfo)
    listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
    logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
    true
  }
}
bubuko.com,布布扣

?

?
Receiver的启动,以ssc.socketTextStream("localhost",?9999)为例,创建的是SocketReceiver对象。内部启动一个线程来连接Socket?Server,读取socket数据并存储。
?
bubuko.com,布布扣
private[streaming]
class SocketReceiver[T: ClassTag](
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends Receiver[T](storageLevel) with Logging {
 
  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }
 
 
  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    var socket: Socket = null
    try {
      logInfo("Connecting to " + host + ":" + port)
      socket = new Socket(host, port)
      logInfo("Connected to " + host + ":" + port)
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next)
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
        // ........... 此处省略1万字 (无关代码) , 呵呵哒 .........
  }
}
bubuko.com,布布扣

?

?
4.4)接下来看JobScheduler.start()中启动的第四项工作JobGenerator。
JobGenerator有成员RecurringTimer,用于启动消息系统和定时器。按照batchInterval时间间隔定期发送GenerateJobs消息。
?
bubuko.com,布布扣
//根据创建StreamContext时传入的batchInterval,定时发送GenerateJobs消息
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
 
JobGenerator的start()方法:
/** Start generation of jobs */
def start(): Unit = synchronized {
  if (eventLoop != null) return // generator has already been started
 
  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
  // See SPARK-10125
  checkpointWriter
 
  eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
 
    override protected def onError(e: Throwable): Unit = {
      jobScheduler.reportError("Error in job generator", e)
    }
  }
 
  // 启动消息循环处理线程
  eventLoop.start()
 
  if (ssc.isCheckpointPresent) {
    restart()
  } else {
    // 开启定时生成Job的定时器
    startFirstTime()
  }
}
bubuko.com,布布扣

?

?
JobGenerator.start()中的startFirstTime()的定义:
bubuko.com,布布扣
/** Starts the generator for the first time */
private def startFirstTime() {
  val startTime = new Time(timer.getStartTime())
  graph.start(startTime - graph.batchDuration)
  timer.start(startTime.milliseconds)
  logInfo("Started JobGenerator at " + startTime)
}
bubuko.com,布布扣

?

JobGenerator.start()中的processEvent()的定义:
  1. /**?Processes?all?events?*/??
  2. private?def?processEvent(event:?JobGeneratorEvent)?{??
  3. ??logDebug("Got?event?"?+?event)??
  4. ??event?match?{??
  5. ????case?GenerateJobs(time)?=>?generateJobs(time)??
  6. ????case?ClearMetadata(time)?=>?clearMetadata(time)??
  7. ????case?DoCheckpoint(time,?clearCheckpointDataLater)?=>??
  8. ??????doCheckpoint(time,?clearCheckpointDataLater)??
  9. ????case?ClearCheckpointData(time)?=>?clearCheckpointData(time)??
  10. ??}??
  11. }??
其中generateJobs的定义:
bubuko.com,布布扣
/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
 
    // 根据特定的时间获取具体的数据
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    //调用DStreamGraph的generateJobs生成Job
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
bubuko.com,布布扣

?

?
DStreamGraph的generateJobs方法,调用输出流的generateJob方法来生成Jobs集合。
bubuko.com,布布扣
// 输出流:具体Action的输出操作
private val outputStreams = new ArrayBuffer[DStream[_]]()
 
def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs
}
bubuko.com,布布扣

?

?
来看下DStream的generateJob方法,调用getOrCompute方法来获取当Interval的时候,DStreamGraph会被BatchData实例化成为RDD,如果有RDD则封装jobFunc方法,里面包含context.sparkContext.runJob(rdd,?emptyFunc),然后返回封装后的Job。
  1. /**??
  2. ?*?Generate?a?SparkStreaming?job?for?the?given?time.?This?is?an?internal?method?that??
  3. ?*?should?not?be?called?directly.?This?default?implementation?creates?a?job??
  4. ?*?that?materializes?the?corresponding?RDD.?Subclasses?of?DStream?may?override?this??
  5. ?*?to?generate?their?own?jobs.??
  6. ?*/??
  7. private[streaming]?def?generateJob(time:?Time):?Option[Job]?=?{??
  8. ??getOrCompute(time)?match?{??
  9. ????case?Some(rdd)?=>?{??
  10. ??????val?jobFunc?=?()?=>?{??
  11. ????????val?emptyFunc?=?{?(iterator:?Iterator[T])?=>?{}?}??
  12. ????????context.sparkContext.runJob(rdd,?emptyFunc)??
  13. ??????}??
  14. ??????Some(new?Job(time,?jobFunc))??
  15. ????}??
  16. ????case?None?=>?None??
  17. ??}??
  18. }??
接下来看JobScheduler的submitJobSet方法,向线程池中提交JobHandler。而JobHandler实现了Runnable?接口,最终调用了job.run()这个方法。看一下Job类的定义,其中run方法调用的func为构造Job时传入的jobFunc,其包含了context.sparkContext.runJob(rdd,?emptyFunc)操作,最终导致Job的提交。
bubuko.com,布布扣
def submitJobSet(jobSet: JobSet) {
  if (jobSet.jobs.isEmpty) {
    logInfo("No jobs added for time " + jobSet.time)
  } else {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
    jobSets.put(jobSet.time, jobSet)
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }
}
bubuko.com,布布扣

?

?
JobHandler实现了Runnable?接口,最终调用了job.run()这个方法:
bubuko.com,布布扣
private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._
 
    def run() {
      try {
    
         //  *********** 此处省略无关代码 *******************
 
        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it‘s possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
      }
    }
  }
}
bubuko.com,布布扣

?

Job的代码片段:
  1. private[streaming]??
  2. class?Job(val?time:?Time,?func:?()?=>?_)?{??
  3. ??private?var?_id:?String?=?_??
  4. ??private?var?_outputOpId:?Int?=?_??
  5. ??private?var?isSet?=?false??
  6. ??private?var?_result:?Try[_]?=?null??
  7. ??private?var?_callSite:?CallSite?=?null??
  8. ??private?var?_startTime:?Option[Long]?=?None??
  9. ??private?var?_endTime:?Option[Long]?=?None??
  10. ??def?run()?{??
  11. ????_result?=?Try(func())??
  12. ??}??
?
以上是主要源码的分析,累死宝宝了,......慢慢的成就感?

5.Spark Streaming流计算框架的运行流程源码分析

原文:http://zhou-yuefei.iteye.com/blog/2297447

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!