一个流应用程序必须全天候运行,所有必须能够解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等)。为了使这成为可能,Spark Streaming需要checkpoint足够的信息到容错存储系统中, 以使系统从故障中恢复。
def createContext(checkpointDirectory: String) : StreamingContext = { // If you do not see this printed, that means the StreamingContext has been loaded // from the new checkpoint println("Creating new context") val sparkConf = new SparkConf().setAppName("DynamicRange") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(8)) ssc.checkpoint(checkpointDirectory) ... //你的kafka streaming的相关代码最好放在这里,不然有可能抛异常:spark checkpoint KafkaInputDStream has not been initialize //create kafka stream val fullLines = KafkaUtils.createStream(ssc, SystemConfig.config.kafkaZkQuorum, SystemConfig.config.kafkaGroup, topicMap); //parse data string val valueLines = fullLines.map(_._2) .. ssc } def main(args: Array[String]) { var ssc: StreamingContext = null try { ssc = StreamingContext.getOrCreate(".", () => { println("get context fail, try to create a new one.") createContext(".") }) } catch{ case e:Exception =>{ println("get context exception, try to create a new one.") ssc = createContext(".") } } ssc.start() ssc.awaitTermination() }
Spark Streaming metadata checkpoint