??? 首先简单解释一下什么是state(状态)管理?我们以wordcount为例。每个batchInterval会计算当前batch的单词计数,那如果需要计算从流开始到目前为止的单词出现的次数,该如计算呢?SparkStreaming提供了两种方法:updateStateByKey和mapWithState 。mapWithState 是1.6版本新增功能,目前属于实验阶段。mapWithState具官方说性能较updateStateByKey提升10倍。那么我们来看看他们到底是如何实现的。
object UpdateStateByKeyDemo{ def main(args:Array[String]){ val conf =newSparkConf().setAppName("UpdateStateByKeyDemo") val ssc =newStreamingContext(conf,Seconds(20))//要使用updateStateByKey方法,必须设置Checkpoint。 ssc.checkpoint("/checkpoint/") val socketLines = ssc.socketTextStream("localhost",9999) socketLines.flatMap(_.split(",")).map(word=>(word,1)).updateStateByKey( (currValues:Seq[Int],preValue:Option[Int])=>{ ????val currValue = currValues.sum //将目前值相加Some(currValue + preValue.getOrElse(0)) //目前值的和加上历史值}).print() ssc.start() ssc.awaitTermination() ssc.stop()}} implicit def toPairDStreamFunctions[K, V](stream:DStream[(K, V)])(implicit kt:ClassTag[K], vt:ClassTag[V], ord:Ordering[K]=null):PairDStreamFunctions[K, V]={newPairDStreamFunctions[K, V](stream)} def updateStateByKey[S:ClassTag]( updateFunc:(Seq[V],Option[S])=>Option[S]):DStream[(K, S)]= ssc.withScope { updateStateByKey(updateFunc, defaultPartitioner())} def updateStateByKey[S:ClassTag]( updateFunc:(Iterator[(K,Seq[V],Option[S])])=>Iterator[(K, S)], partitioner:Partitioner, rememberPartitioner:Boolean):DStream[(K, S)]= ssc.withScope {newStateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner,None)}private[this] def computeUsingPreviousRDD ( parentRDD: RDD[(K, V)], prevStateRDD: RDD[(K, S)])={// Define the function for the mapPartition operation on cogrouped RDD;// first map the cogrouped tuple to tuples of required type,// and then apply the update function val updateFuncLocal = updateFunc val finalFunc =(iterator:Iterator[(K,(Iterable[V],Iterable[S]))])=>{ val i = iterator.map { t => val itr = t._2._2.iterator val headOption =if(itr.hasNext)Some(itr.next())elseNone(t._1, t._2._1.toSeq, headOption)} updateFuncLocal(i)} val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)Some(stateRDD)}object StatefulNetworkWordCount{ def main(args:Array[String]){if(args.length <2){System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")System.exit(1)}?StreamingExamples.setStreamingLogLevels()? val sparkConf =newSparkConf().setAppName("StatefulNetworkWordCount")// Create the context with a 1 second batch size val ssc =newStreamingContext(sparkConf,Seconds(1)) ssc.checkpoint(".")?// Initial state RDD for mapWithState operation val initialRDD = ssc.sparkContext.parallelize(List(("hello",1),("world",1)))?// Create a ReceiverInputDStream on target ip:port and count the// words in input stream of \n delimited test (eg. generated by ‘nc‘) val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x =>(x,1))?// Update the cumulative count using mapWithState// This will give a DStream made of state (which is the cumulative count of the words) val mappingFunc =(word:String, one:Option[Int], state:State[Int])=>{ val sum = one.getOrElse(0)+ state.getOption.getOrElse(0) val output =(word, sum) state.update(sum) output}? val stateDstream = wordDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)) stateDstream.print() ssc.start() ssc.awaitTermination()}} def mapWithState[StateType:ClassTag,MappedType:ClassTag]( spec:StateSpec[K, V,StateType,MappedType]):MapWithStateDStream[K, V,StateType,MappedType]={newMapWithStateDStreamImpl[K, V,StateType,MappedType]( self, spec.asInstanceOf[StateSpecImpl[K, V,StateType,MappedType]])}/** Internal implementation of the [[MapWithStateDStream]] */private[streaming]classMapWithStateDStreamImpl[KeyType:ClassTag,ValueType:ClassTag,StateType:ClassTag,MappedType:ClassTag]( dataStream:DStream[(KeyType,ValueType)], spec:StateSpecImpl[KeyType,ValueType,StateType,MappedType])extendsMapWithStateDStream[KeyType,ValueType,StateType,MappedType](dataStream.context){?private val internalStream =newInternalMapWithStateDStream[KeyType,ValueType,StateType,MappedType](dataStream, spec)? override def slideDuration:Duration= internalStream.slideDuration? override def dependencies:List[DStream[_]]=List(internalStream)? override def compute(validTime:Time):Option[RDD[MappedType]]={ internalStream.getOrCompute(validTime).map { _.flatMap[MappedType]{ _.mappedData }}}/** Method that generates a RDD for the given time */ override def compute(validTime:Time):Option[RDD[MapWithStateRDDRecord[K, S, E]]]={// Get the previous state or create a new empty state RDD val prevStateRDD = getOrCompute(validTime - slideDuration) match {caseSome(rdd)=>if(rdd.partitioner !=Some(partitioner)){// If the RDD is not partitioned the right way, let us repartition it using the// partition index as the key. This is to ensure that state RDD is always partitioned// before creating another state RDD using itMapWithStateRDD.createFromRDD[K, V, S, E]( rdd.flatMap { _.stateMap.getAll()}, partitioner, validTime)}else{ rdd}caseNone=>MapWithStateRDD.createFromPairRDD[K, V, S, E]( spec.getInitialStateRDD().getOrElse(newEmptyRDD[(K, S)](ssc.sparkContext)), partitioner, validTime)}??// Compute the new state RDD with previous state RDD and partitioned data RDD// Even if there is no data RDD, use an empty one to create a new state RDD val dataRDD = parent.getOrCompute(validTime).getOrElse { context.sparkContext.emptyRDD[(K, V)]} val partitionedDataRDD = dataRDD.partitionBy(partitioner) val timeoutThresholdTime = spec.getTimeoutInterval().map { interval =>(validTime - interval).milliseconds}Some(newMapWithStateRDD( prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))} override def compute( partition:Partition, context:TaskContext):Iterator[MapWithStateRDDRecord[K, S, E]]={? val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition] val prevStateRDDIterator = prevStateRDD.iterator( stateRDDPartition.previousSessionRDDPartition, context) val dataIterator = partitionedDataRDD.iterator( stateRDDPartition.partitionedDataRDDPartition, context)?
val prevRecord =if(prevStateRDDIterator.hasNext)Some(prevStateRDDIterator.next())elseNone val newRecord =MapWithStateRDDRecord.updateRecordWithData( prevRecord, dataIterator, mappingFunction, batchTime, timeoutThresholdTime, removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled)Iterator(newRecord)}private[streaming]caseclassMapWithStateRDDRecord[K, S, E]( var stateMap:StateMap[K, S], var mappedData:Seq[E]) def updateRecordWithData[K:ClassTag, V:ClassTag, S:ClassTag, E:ClassTag]( prevRecord:Option[MapWithStateRDDRecord[K, S, E]], dataIterator:Iterator[(K, V)], mappingFunction:(Time, K,Option[V],State[S])=>Option[E], batchTime:Time, timeoutThresholdTime:Option[Long], removeTimedoutData:Boolean):MapWithStateRDDRecord[K, S, E]={?// 创建一个新的 state map 从过去的Recoord中复制 (如果存在) 否则创建一下空的StateMap对象 val newStateMap = prevRecord.map { _.stateMap.copy()}. getOrElse {newEmptyStateMap[K, S]()}? val mappedData =newArrayBuffer[E] val wrappedState =newStateImpl[S]()?// Call the mapping function on each record in the data iterator, and accordingly// update the states touched, and collect the data returned by the mapping function dataIterator.foreach {case(key, value)=>????//获取key对应的状态 wrappedState.wrap(newStateMap.get(key)) val returned = mappingFunction(batchTime, key,Some(value), wrappedState)????//维护newStateMap的值
if(wrappedState.isRemoved){ newStateMap.remove(key)}elseif(wrappedState.isUpdated||(wrappedState.exists && timeoutThresholdTime.isDefined)){ newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)} mappedData ++= returned}?// Get the timed out state records, call the mapping function on each and collect the// data returnedif(removeTimedoutData && timeoutThresholdTime.isDefined){ newStateMap.getByTime(timeoutThresholdTime.get).foreach {case(key, state, _)=> wrappedState.wrapTimingOutState(state) val returned = mappingFunction(batchTime, key,None, wrappedState) mappedData ++= returned newStateMap.remove(key)}}?MapWithStateRDDRecord(newStateMap, mappedData)}14.Spark Streaming源码解读:State管理之updateStateByKey和mapWithState解密
原文:http://zhou-yuefei.iteye.com/blog/2303007