package sparkString import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object ss01 extends App { val conf = new SparkConf().setMaster("local[*]").setAppName("lsj") val ssc = new StreamingContext(conf,Seconds(5)) //sparkstring与structuedString的不同之处,structuedString有状态存储,每次可以发送全局的,sparkString每次发的是像批处理 ssc.socketTextStream("localhost",44444) .flatMap(_.split(",")) .map((_,1)) .reduceByKey(_+_) .print() ssc.start() ssc.awaitTermination() }
在windows端打开nc发送数据,然后启动程序可以看到每一批的词频统计。
原文:https://www.cnblogs.com/shiji7/p/12091740.html