Pom.xml加入以下依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency>
案例如下:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by huicheng on 25/07/2019. * */ object WorldCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("master01", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate } } }
按照Spark Core中的方式进行打包,并将程序上传到Spark机器。并运行:
bin/spark-submit --class com.c.streaming.WorldCount ~/wordcount-jar-with- dependencies.jar
通过Netcat发送数据:
# TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world
如果程序运行时,log日志太多,可以将spark conf目录下的log4j文件里面的日志级别改成WARN
原文:https://www.cnblogs.com/zhanghuicheng/p/11227372.html