在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。
assignWindows将某个带有时间戳timestamp的元素element分配给一个或多个窗口,并返回窗口集合
getDefaultTrigger 返回跟WindowAssigner关联的默认触发器
getWindowSerializer返回WindowAssigner分配的窗口的序列化器
窗口分配器定义如何将数据元分配给窗口。这是通过WindowAssigner 在window(...)(对于被Keys化流)或windowAll()(对于非被Keys化流)调用中指定您的选择来完成的。
WindowAssigner负责将每个传入数据元分配给一个或多个窗口。Flink带有预定义的窗口分配器,用于最常见的用例
即翻滚窗口, 滑动窗口,会话窗口和全局窗口。
您还可以通过扩展WindowAssigner类来实现自定义窗口分配器。
所有内置窗口分配器(全局窗口除外)都根据时间为窗口分配数据元,这可以是处理时间或事件时间。
选择合适的计算函数,减少开发代码量提高系统性能
EventTimeTrigger基于事件时间的触发器,对应onEventTime
ProcessingTimeTrigger
基于当前系统时间的触发器,对应onProcessingTime
ProcessingTime 有最好的性能和最低的延迟。但在分布式计算环境中ProcessingTime具有不确定性,相同数据流多次运行有可能产生不同的计算结果。
ContinuousEventTimeTrigger
ContinuousProcessingTimeTrigger
CountTrigger
一旦窗口中的数据元数量超过给定限制,就会触发。所以其触发机制实现在onElement中
基于处理时间的触发。
根据 watermarks 度量的事件时间进度进行触发。
另一个触发器作为参数作为参数并将其转换为清除触发器。
其作用是在 Trigger 触发窗口计算之后将窗口的 State 中的数据清除。
前两条数据先后于20:01和20:02进入窗口,此时 State 中的值更新为3,同时到了Trigger的触发时间,输出结果为3。
由于 PurgingTrigger 的作用,State 中的数据会被清除。
可以理解为按照原始数据流中的某个key进行分类,拥有同一个key值的数据流将为进入同一个window,多个窗口并行的逻辑流
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
不做分类,每进入一条数据即增加一个窗口,多个窗口并行,每个窗口处理1条数据
WindowAll 将元素按照某种特性聚集在一起,该函数不支持并行操作,默认的并行度就是1,所以如果使用这个算子的话需要注意一下性能问题
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
每一条记录来了以后会根据时间属性值采用不同的window assinger 方法分配给一个或者多个窗口,分为滚动窗口(Tumbling windows)和滑动窗口(Sliding windows)。
EventTime 数据本身携带的时间,默认的时间属性;
ProcessingTime 处理时间;
IngestionTime 数据进入flink程序的时间;
滚动窗口下窗口之间不重叠,且窗口长度是固定的。我们可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows创建一个基于Event Time或Processing Time的滚动时间窗口。
下面示例以滚动时间窗口(TumblingEventTimeWindows
)为例,默认模式是TimeCharacteristic.ProcessingTime
处理时间
/** The time characteristic that is used if none other is set. */
private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
所以如果使用Event Time
即数据的实际产生时间,需要通过senv.setStreamTimeCharacteristic
指定
// 指定使用数据的实际时间
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// 这里减去8小时,表示用UTC世界时间
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定。使用时,我们要设置Slide和Size。Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,一个事件会被分配到多个窗口;Slide大于Size,有些事件可能被丢掉。
同理,如果是滑动时间窗口,也是类似的:
// 窗口的大小是10s,每5s滑动一次,也就是5s计算一次
.timeWindow(Time.seconds(10), Time.seconds(5))
这里使用的是timeWindow
,通常使用window
,那么两者的区别是什么呢?
timeWindow
其实判断时间的处理模式是ProcessingTime
还是SlidingEventTimeWindows
,帮我们判断好了,调用方法直接传入(Time size, Time slide)
这两个参数就好了,如果是使用.window
方法,则需要自己来判断,就是前者写法更简单一些。
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
}
Count Window 是根据元素个数对数据流进行分组的,也分滚动(tumb)和滑动(slide)。
Tumbling Count Window
当我们想要每100个用户购买行为事件统计购买总数,那么每当窗口中填满100个元素了,就会对窗口进行计算,这种窗口我们称之为翻滚计数窗口(Tumbling Count Window),上图所示窗口大小为3个。通过使用 DataStream API,我们可以这样实现:
// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = buyCnts
// key stream by sensorId
.keyBy(0)
// tumbling count window of 100 elements size
.countWindow(100)
// compute the buyCnt sum
.sum(1)
Sliding Count Window
当然Count Window 也支持 Sliding Window,虽在上图中未描述出来,但和Sliding Time Window含义是类似的,例如计算每10个元素计算一次最近100个元素的总和,代码示例如下。
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding count window of 100 elements size and 10 elements trigger interval
.countWindow(100, 10)
.sum(1)
DataStream<T> input = ...;
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
SlidingEventTimeWindows,
SlidingProcessingTimeWindows,
TumblingEventTimeWindows,
TumblingProcessingTimeWindows
基于时间的滑动窗口
基于时间的翻滚窗口
基于计数的滑动窗口
基于计数的翻滚窗口
会话窗口
会话窗口:一条记录一个窗口
全局窗口(GlobalWindows)
默认情况下,当水印超过窗口末尾时,会删除延迟数据元。
但是,Flink允许为窗口 算子指定最大允许延迟。允许延迟指定数据元在被删除之前可以延迟多少时间,并且其默认值为0.
在水印通过窗口结束之后但在通过窗口结束加上允许的延迟之前到达的数据元,仍然添加到窗口中。
根据使用的触发器,延迟但未丢弃的数据元可能会导致窗口再次触发。就是这种情况EventTimeTrigger。
当指定允许的延迟大于0时,在水印通过窗口结束后保持窗口及其内容。在这些情况下,当迟到但未掉落的数据元到达时,它可能触发窗口的另一次触发。
这些射击被称为late firings,因为它们是由迟到事件触发的,与之相反的main firing 是窗口的第一次射击。在会话窗口的情况下,后期点火可以进一步导致窗口的合并,因为它们可以“桥接”两个预先存在的未合并窗口之间的间隙。
后期触发发出的数据元应该被视为先前计算的更新结果,即,您的数据流将包含同一计算的多个结果。根据您的应用程序,您需要考虑这些重复的结果或对其进行重复数据删除。
在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。
watermark>=window_n_end_time && window_n_start_time<=vent_time<window_n_end_time(即数据属于这个窗口)
AssignerWithPeriodicWatermarks或接口AssignerWithPunctuatedWatermarks。
简而言之,前一个接口将会周期性发送Watermark,而第二个接口根据一些到达数据的属性,例如一旦在流中碰到一个特殊的element便发送Watermark。
关注公众号:Java大数据与数据仓库,回复 "资料",领取大数据资料,学习大数据技术。
原文:https://www.cnblogs.com/data-magnifier/p/14608739.html