1 package com.demo.SparkStreamingKafka_Receiver_checkpoint 2 3 import org.apache.kafka.clients.consumer.ConsumerRecord 4 import org.apache.kafka.common.serialization.StringDeserializer 5 import org.apache.spark.streaming.dstream.{DStream, InputDStream} 6 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 7 import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils} 8 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 9 import org.apache.spark.streaming.{Seconds, StreamingContext} 10 import org.apache.spark.{SparkConf, SparkContext} 11 12 object StreamFromKafka { 13 14 def updateFunc(a: Seq[Int], b: Option[Int]): Option[Int] = { 15 Some(a.sum + b.getOrElse(0)) 16 } 17 18 def main(args: Array[String]): Unit = { 19 20 val checkpointPath = "./kafka-direct3" 21 22 val ssc = StreamingContext.getOrCreate(checkpointPath, () => { 23 createFunc(checkpointPath) 24 }) 25 ssc.start() 26 ssc.awaitTermination() 27 } 28 29 def createFunc(checkpointPath: String): StreamingContext = { 30 //todo:1、创建sparkConf 31 val sparkConf: SparkConf = new SparkConf() 32 .setAppName("SparkStreamingKafka_direct_checkpoint") 33 .setMaster("local[4]") 34 35 //todo:2、创建sparkContext 36 val sc = new SparkContext(sparkConf) 37 38 sc.setLogLevel("WARN") 39 //todo:3、创建StreamingContext 40 val ssc = new StreamingContext(sc, Seconds(5)) 41 ssc.checkpoint(checkpointPath) 42 //todo:4、kafka的参数配置 43 /*val kafkaParams=Map("metadata.broker.list" ->"node1:9092,node2:9092,node3:9092" 44 ,"group.id" -> "kafka-direct01")*/ 45 46 val kafkaParams = Map[String, Object]( 47 "bootstrap.servers" -> "192.168.107.144:9092,192.168.107.133:9092,192.168.107.132:9092", 48 "key.deserializer" -> classOf[StringDeserializer], 49 "value.deserializer" -> classOf[StringDeserializer], 50 "group.id" -> "group1" 51 ) 52 53 54 55 //todo:5、定义一个topics ,是一个集合,可以存放多个topic 56 val topics = Set("test") 57 58 /* val offsets = collection.Map[TopicPartition, Long] { 59 new TopicPartition(KAFKA_Input_TOPIC, KAFKA_NUM_PARTITION.toInt) -> KAFKA_NOW_OFFSET 60 } 61 62 val subscribe = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)*/ 63 64 //todo:6、利用KafkaUtils.createDirectStream构建Dstream 65 //val kafkaTopicDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics) 66 val kafkaTopicDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) 67 //todo:7、获取kafka中topic的数据 68 val kafkaData: DStream[String] = kafkaTopicDS.map(x => x.value()) 69 70 //todo:8、切分每一行,每个单词记为1 71 val wordAndOne: DStream[(String, Int)] = kafkaData.flatMap(_.split(" ")).map((_, 1)) 72 73 //todo:9、相同单词出现次数累加 74 val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc) 75 76 //todo:打印 77 result.print() 78 ssc 79 } 80 81 }
原文:https://www.cnblogs.com/xjqi/p/12817119.html