lines.flatMap(_.split(" "))
将一个数据一对多的转换为另外的形式, 规则通过传入函数指定
批量计算
流计算
Netcat
的使用目标:使用 Spark Streaming
程序和 Socket server
进行交互, 从 Server
处获取实时传输过来的字符串, 拆开单词并统计单词数量, 最后打印出来每一个小批次的单词数量
步骤:
package cn.itcast.streaming import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamingWordCount { def main(args: Array[String]): Unit = { //1.初始化 val sparkConf=new SparkConf().setAppName("streaming").setMaster("local[2]") val ssc=new StreamingContext(sparkConf,Seconds(5)) ssc.sparkContext.setLogLevel("WARN") val lines: ReceiverInputDStream[String] = ssc.socketTextStream( hostname = "192.168.31.101", port = 9999, storageLevel = StorageLevel.MEMORY_AND_DISK_SER ) //2.数据处理 //2.1把句子拆单词 val words: DStream[String] =lines.flatMap(_.split(" ")) val tuples: DStream[(String, Int)] =words.map((_,1)) val counts: DStream[(String, Int)] =tuples.reduceByKey(_+_) //3.展示 counts.print() ssc.start() ssc.awaitTermination() } }
开始进行交互:
注意:
Spark Streaming
并不是真正的来一条数据处理一条
Spark Streaming
的处理机制叫做小批量, 英文叫做 mini-batch
, 是收集了一定时间的数据后生成 RDD
, 后针对 RDD
进行各种转换操作, 这个原理提现在如下两个地方
RDD
去统计呢? 由 new StreamingContext(sparkConf, Seconds(1))
这段代码中的第二个参数指定批次生成的时间Spark Streaming
中至少要有两个线程
在使用 spark-submit
启动程序的时候, 不能指定一个线程
这些算子类似 RDD
, 也会生成新的 DStream
这些算子操作最终会落到每一个 DStream
生成的 RDD
中
算子 | 释义 |
---|---|
|
将一个数据一对多的转换为另外的形式, 规则通过传入函数指定 |
|
一对一的转换数据 |
|
这个算子需要特别注意, 这个聚合并不是针对于整个流, 而是针对于某个批次的数据 |
编程模型 | 解释 |
---|---|
|
|
|
|
|
|
Spark Streaming
时代
Structured Streaming
时代
Spark学习进度11-Spark Streaming&Structured Streaming
原文:https://www.cnblogs.com/xiaofengzai/p/14284502.html