首页 > 其他 > 详细

scala spark(2.10)读取kafka(2.10)示例

时间:2018-03-19 20:35:39      阅读:497      评论:0      收藏:0      [点我收藏+]

1、pom加载jar包

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.0</version>
</dependency>

2、代码
object Demo01 {

def main(args: Array[String]): Unit = {
val sprakConf = new SparkConf().setAppName("DirectKafkaWordCountDemo")
sprakConf.setMaster("local[2]")
val ssc = new StreamingContext(sprakConf, Seconds(3))
val brokers ="hadoop01:9092"
val topics="test"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val lines=messages.map(_._2)
val wordCounts=lines.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
wordCounts.saveAsTextFiles("hdfs://hadoop01:9000/spark/wordcount.txt")
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

scala spark(2.10)读取kafka(2.10)示例

原文:https://www.cnblogs.com/runnerjack/p/8604410.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!