本期内容
1、ReceivedBlockTracker容错安全性
2、DStreamGraph和JobGenerator容错安全性
一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下。
Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。如果可以掌握Spark streaming这个复杂的应用程序,那么其他的再复杂的应用程序都不在话下了。这里选择Spark Streaming作为版本定制的切入点也是大势所趋。
从数据层面,ReceivedBlockTracker为整个Spark Streaming应用程序记录元数据信息。
从调度层面,DStreamGraph和JobGenerator是Spark Streaming调度的核心,记录当前调度到哪一进度,和业务有关。
先从ReceiverTracker角度出发
如果开启WAL,则将元数据写入WAL,加入ReceivedBlockQueue
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { try { val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) if (writeResult) { synchronized { getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo } logDebug(s"Stream ${receivedBlockInfo.streamId} received " + s"block ${receivedBlockInfo.blockStoreResult.blockId}") } else { logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " + s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.") } writeResult } catch { case NonFatal(e) => logError(s"Error adding block $receivedBlockInfo", e) false } }
这里来看writeToLog方法
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { if (isWriteAheadLogEnabled) { logTrace(s"Writing record: $record") try { writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)), clock.getTimeMillis()) true } catch { case NonFatal(e) => logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e) false } } else { true } }
再来看看JobScheduler为为分配batch分配block
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") } } else { logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") } }
这里此writeToLog虽然每次都调用,但是方法内部还是会判断WAL是否开启。
再进入到writeToLog内部
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { if (isWriteAheadLogEnabled) { logTrace(s"Writing record: $record") try { writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)), clock.getTimeMillis()) true } catch { case NonFatal(e) => logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e) false } } else { true } }
写入完成之后,ReceiverTracker部分容错就是数据部分完成
接下来看看从job生成的角度,也就是调度层面看看容错
根据DStreamGraph每次根据一个固定的batchINterval定时生成Job后都要调用DoCheckpoint
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) { if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) ssc.graph.updateCheckpointData(time) checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater) } }
在改方法中会根据更新时间每一个的checkpoint,调用updateCheckPointData方法
def updateCheckpointData(time: Time) { logInfo("Updating checkpoint data for time " + time) this.synchronized { outputStreams.foreach(_.updateCheckpointData(time)) } logInfo("Updated checkpoint data for time " + time) } private[streaming] def updateCheckpointData(currentTime: Time) { logDebug("Updating checkpoint data for time " + currentTime) checkpointData.update(currentTime) dependencies.foreach(_.updateCheckpointData(currentTime)) logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) }
总结来说,ReceivedBlockTracker处理数据层面,它通过WAL的方式
而DStreamGraph和JobGenerator是从调度层面出发,通过checkpoint的方式
备注:
资料来源于:DT_大数据梦工厂(Spark发行版本定制)
更多私密内容,请关注微信公众号:DT_Spark
如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580
原文:http://www.cnblogs.com/pzwxySpark/p/Spark13.html