首页 > Web开发 > 详细

SparkStreaming_Flume_Poll 流处理

时间:2020-04-30 11:07:53      阅读:57      评论:0      收藏:0      [点我收藏+]
 1 package com.SparkStreaming_Flume_Poll
 2 
 3 import java.net.InetSocketAddress
 4 import org.apache.spark.storage.StorageLevel
 5 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
 6 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
 7 import org.apache.spark.streaming.{Seconds, StreamingContext}
 8 import org.apache.spark.{SparkConf, SparkContext}
 9 
10 object SparkStreaming_Flume_Poll {
11 
12   //newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1
13   //runningCount 历史的所有相同key的value总和
14   def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
15     val newCount = runningCount.getOrElse(0) + newValues.sum
16     Some(newCount)
17   }
18 
19 
20   def main(args: Array[String]): Unit = {
21     //配置sparkConf参数
22     val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]")
23     //构建sparkContext对象
24     val sc: SparkContext = new SparkContext(sparkConf)
25     //设置日志级别
26     sc.setLogLevel("WARN")
27     //构建StreamingContext对象,每个批处理的时间间隔
28     val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
29     //设置checkpoint
30     scc.checkpoint("./")
31     //设置flume的地址,可以设置多台
32     val address = Seq(new InetSocketAddress("192.168.107.144", 8888))
33     // 从flume中拉取数据
34     val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc, address, StorageLevel.MEMORY_AND_DISK)
35 
36     //获取flume中数据,数据存在event的body中,转化为String
37     val lineStream: DStream[String] = flumeStream.map(x => new String(x.event.getBody.array()))
38     //实现单词汇总
39     val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction)
40 
41     result.print()
42     scc.start()
43     scc.awaitTermination()
44   }
45 
46 
47 }

 

SparkStreaming_Flume_Poll 流处理

原文:https://www.cnblogs.com/xjqi/p/12806870.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!