1 package com.SparkStreaming_Flume_Poll 2 3 import java.net.InetSocketAddress 4 import org.apache.spark.storage.StorageLevel 5 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} 6 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} 7 import org.apache.spark.streaming.{Seconds, StreamingContext} 8 import org.apache.spark.{SparkConf, SparkContext} 9 10 object SparkStreaming_Flume_Poll { 11 12 //newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1 13 //runningCount 历史的所有相同key的value总和 14 def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { 15 val newCount = runningCount.getOrElse(0) + newValues.sum 16 Some(newCount) 17 } 18 19 20 def main(args: Array[String]): Unit = { 21 //配置sparkConf参数 22 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]") 23 //构建sparkContext对象 24 val sc: SparkContext = new SparkContext(sparkConf) 25 //设置日志级别 26 sc.setLogLevel("WARN") 27 //构建StreamingContext对象,每个批处理的时间间隔 28 val scc: StreamingContext = new StreamingContext(sc, Seconds(5)) 29 //设置checkpoint 30 scc.checkpoint("./") 31 //设置flume的地址,可以设置多台 32 val address = Seq(new InetSocketAddress("192.168.107.144", 8888)) 33 // 从flume中拉取数据 34 val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc, address, StorageLevel.MEMORY_AND_DISK) 35 36 //获取flume中数据,数据存在event的body中,转化为String 37 val lineStream: DStream[String] = flumeStream.map(x => new String(x.event.getBody.array())) 38 //实现单词汇总 39 val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunction) 40 41 result.print() 42 scc.start() 43 scc.awaitTermination() 44 } 45 46 47 }
原文:https://www.cnblogs.com/xjqi/p/12806870.html