首页 > 其他 > 详细

StreamFromKafka

时间:2020-05-02 11:12:26      阅读:43      评论:0      收藏:0      [点我收藏+]
 1 package com.demo.SparkStreamingKafka_Receiver_checkpoint
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerRecord
 4 import org.apache.kafka.common.serialization.StringDeserializer
 5 import org.apache.spark.streaming.dstream.{DStream, InputDStream}
 6 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
 7 import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils}
 8 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
 9 import org.apache.spark.streaming.{Seconds, StreamingContext}
10 import org.apache.spark.{SparkConf, SparkContext}
11 
12 object StreamFromKafka {
13 
14   def updateFunc(a: Seq[Int], b: Option[Int]): Option[Int] = {
15     Some(a.sum + b.getOrElse(0))
16   }
17 
18   def main(args: Array[String]): Unit = {
19 
20     val checkpointPath = "./kafka-direct3"
21 
22     val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
23       createFunc(checkpointPath)
24     })
25     ssc.start()
26     ssc.awaitTermination()
27   }
28 
29   def createFunc(checkpointPath: String): StreamingContext = {
30     //todo:1、创建sparkConf
31     val sparkConf: SparkConf = new SparkConf()
32       .setAppName("SparkStreamingKafka_direct_checkpoint")
33       .setMaster("local[4]")
34 
35     //todo:2、创建sparkContext
36     val sc = new SparkContext(sparkConf)
37 
38     sc.setLogLevel("WARN")
39     //todo:3、创建StreamingContext
40     val ssc = new StreamingContext(sc, Seconds(5))
41     ssc.checkpoint(checkpointPath)
42     //todo:4、kafka的参数配置
43     /*val kafkaParams=Map("metadata.broker.list" ->"node1:9092,node2:9092,node3:9092"
44       ,"group.id" -> "kafka-direct01")*/
45 
46     val kafkaParams = Map[String, Object](
47       "bootstrap.servers" -> "192.168.107.144:9092,192.168.107.133:9092,192.168.107.132:9092",
48       "key.deserializer" -> classOf[StringDeserializer],
49       "value.deserializer" -> classOf[StringDeserializer],
50       "group.id" -> "group1"
51     )
52 
53 
54 
55     //todo:5、定义一个topics ,是一个集合,可以存放多个topic
56     val topics = Set("test")
57 
58 /*    val offsets = collection.Map[TopicPartition, Long] {
59       new TopicPartition(KAFKA_Input_TOPIC, KAFKA_NUM_PARTITION.toInt) -> KAFKA_NOW_OFFSET
60     }
61 
62     val subscribe = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)*/
63 
64     //todo:6、利用KafkaUtils.createDirectStream构建Dstream
65     //val kafkaTopicDS: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
66     val kafkaTopicDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
67     //todo:7、获取kafka中topic的数据
68     val kafkaData: DStream[String] = kafkaTopicDS.map(x => x.value())
69 
70     //todo:8、切分每一行,每个单词记为1
71     val wordAndOne: DStream[(String, Int)] = kafkaData.flatMap(_.split(" ")).map((_, 1))
72 
73     //todo:9、相同单词出现次数累加
74     val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
75 
76     //todo:打印
77     result.print()
78     ssc
79   }
80 
81 }

 

StreamFromKafka

原文:https://www.cnblogs.com/xjqi/p/12817119.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!