master下载netcat
我这里是ubuntu,所以命令如下:
sudo apt-get install -y netcat
master上运行:
nc -lk 9999
然后可以输入内容,以空格为间隔
启动IDEA上的程序
package cn.itcast.streaming import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkContext, streaming} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Author itcast * Desc 使用SparkStreaming接收node1:9999的数据并做WordCount */ object WordCount01 { def main(args: Array[String]): Unit = { //TODO 0.准备环境 val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN") //the time interval at which streaming data will be divided into batches val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))//每隔5s划分一个批次 //TODO 1.加载数据 val lines: ReceiverInputDStream[String] = ssc.socketTextStream("master",9999) //TODO 2.处理数据 val resultDS: DStream[(String, Int)] = lines.flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) //TODO 3.输出结果 resultDS.print() //TODO 4.启动并等待结束 ssc.start() ssc.awaitTermination()//注意:流式应用程序启动之后需要一直运行等待手动停止/等待数据到来 //TODO 5.关闭资源 ssc.stop(stopSparkContext = true, stopGracefully = true)//优雅关闭 } }
原文:https://www.cnblogs.com/a155-/p/14487669.html