水位线 = 到达最大时间 - 延时时间
水位线是真正的事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
如果要使用 processing time,将 TimeCharacteristic.EventTime 替换为 TimeCharacteristic.ProcessingTIme 就可以了
使用事件时间的流必须要带水位线,而且要指定数据源中的时间戳
assignAscendingTimestamps(_._2) //指定时间戳为_._2,延时时间为0
assignTimestampsAndWatermarks(
// 最大延迟时间设置为5s
new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
override def extractTimestamp(element: (String, Long)): Long = element._2 //设置时间戳为element._2
}
)
package test4
import java.lang
import java.sql.Timestamp
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
// nc -lk 9999
// 先使用默认的插入频率,200ms插入一次
// 将`env.getConfig.setAutoWatermarkInterval(60000)`注释掉
//a 1
//a 2
//a 15
//a 1
//a 2
//a 17
//a 12
//a 11
//a 12
//a 13
//a 25
// 添加`env.getConfig.setAutoWatermarkInterval(60000)`
//a 1
//a 2
//a 100
//a 5
//a 8
//a 3
//a 50
//a 51
//a 52
object WatermarkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 应用程序使用事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 系统每隔一分钟的机器时间插入一次水位线
env.getConfig.setAutoWatermarkInterval(60000)
val stream = env
.socketTextStream("localhost", 9999, ‘\n‘)
.map(line => {
val arr = line.split(" ")
// 第二个元素是时间戳,必须转换成毫秒单位
(arr(0), arr(1).toLong * 1000)
})
// 抽取时间戳和插入水位线
// 插入水位线的操作一定要紧跟source算子
.assignTimestampsAndWatermarks(
// 最大延迟时间设置为5s
new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
override def extractTimestamp(element: (String, Long)): Long = element._2
}
)
.keyBy(_._1)
// 10s的滚动窗口
.timeWindow(Time.seconds(10))
.process(new MyProcess)
stream.print()
env.execute()
}
class MyProcess extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit =
out.collect("窗口结束时间为:" + new Timestamp(context.window.getEnd) + " 的窗口中共有 " + elements.size + " 条数据")
}
}
env.getConfig.setAutoWatermarkInterval(60000)
package test4
import test2.{SensorReading, SensorSource}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object PeriodicInsertWatermarks {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
.addSource(new SensorSource)
.assignTimestampsAndWatermarks(
new MyAssigner
)
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.process(new MyProcess)
stream.print()
env.execute()
}
// `BoundedOutOfOrdernessTimestampExtractor`的底层实现
class MyAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {
val bound: Long = 1000L // 最大延迟时间
var maxTs: Long = Long.MinValue + bound // 观察到的最大时间戳
// 每来一条元素就要调用一次
override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
maxTs = maxTs.max(element.timestamp)
element.timestamp
}
// 产生水位线的函数,默认200ms调用一次
override def getCurrentWatermark: Watermark = {
// 水位线 = 观察到的最大时间戳 - 最大延迟时间
new Watermark(maxTs - bound)
}
}
class MyProcess extends ProcessWindowFunction[SensorReading, String, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[SensorReading], out: Collector[String]): Unit = {
out.collect(elements.size.toString)
}
}
}
有时候输入流中会包含一些用于指示系统进度的特殊元组或标记。 Flink 为此类情形以及可
根据输入元素生成水位线的情形提供了 AssignerWithPunctuatedWatermarks 接口。该接
口中的 checkAndGetNextWatermark() 方法会在针对每个事件的 extractTimestamp()
法后立即调用。它可以决定是否生成一个新的水位线。如果该方法返回一个非空、且大于之
前值的水位线,算子就会将这个新水位线发出。
class PunctuatedAssigner
extends AssignerWithPunctuatedWatermarks[SensorReading] {
val bound: Long = 60 * 1000
// 每来一条数据就调用一次
override def checkAndGetNextWatermark(r: SensorReading,
extractedTS: Long): Watermark = {
if (r.id == "sensor_1") {
// 抽取的时间戳 - 最大延迟时间
new Watermark(extractedTS - bound)
} else {
null
}
}
// 每来一条数据就调用一次
override def extractTimestamp(r: SensorReading,
previousTS: Long): Long = {
r.timestamp
}
}
原文:https://www.cnblogs.com/andyonline/p/13364203.html