例如:TumblingEventTimeWindows(滚动窗口)默认触发器为EventTimeTrigger,默认情况下在当前水印时间大于等于当前窗口最大时间(窗口结束时间-1)时触发window function。
和onTimer的区别:
1.onTimer用在process function中,也就是说,onTimer是基于DataStream和KeyedStream的。
2.trigger是用在window func
Trigger有 5 个方法来允许触发器处理不同的事件:
onElement()方法,每个元素被添加到窗口时调用
onEventTime()方法,当一个已注册的事件时间计时器启动时调用
onProcessingTime()方法,当一个已注册的处理时间计时器启动时调用
onMerge()方法,与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。
clear()方法,执行任何需要清除的相应窗口。
注意:清除将仅删除窗口的内容,并将保留有关该窗口的任何潜在元信息和任何触发状态。
import com.atguigu.StreamingJob.{SensorReading, SensorSource} import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector case class SensorReading(id:String,timestamp :Long,temperature : Double) object TriggersTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //设置并行度 env.setParallelism(1) //设置事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //获取事件源 val stream = env.addSource(new SensorSource) val setTime = stream.assignAscendingTimestamps(_.timestamp) //设置事件时间的获取方式 val result = setTime.keyBy(_.id).timeWindow(Time.seconds(15)).trigger(new OneSecondIntervalTrigger ).process(new AllWindom) result.print() env.execute() } //创建窗口的全量函数 in out key windom class AllWindom extends ProcessWindowFunction[SensorReading,(String,Double,Double,Long),String,TimeWindow]{ override def process(key: String, context: Context, elements: Iterable[SensorReading], out: Collector[(String, Double, Double,Long)]): Unit = { var doubles: Iterable[Double] = elements.map(_.temperature) out.collect( (key,doubles.max,doubles.min,context.window.getEnd)) } } } //设置一秒钟一次的触发器 class OneSecondIntervalTrigger extends Trigger[SensorReading , TimeWindow]{ //回调函数 override def onEventTime(l: Long, //触发定时器的时间,即前文设置的定时时间,默认窗口结束时会调用一次 w: TimeWindow, //窗口 triggerContext: Trigger.TriggerContext): TriggerResult = { //判断l是否为窗口的结束时间 if(l==w.getEnd){ //触发窗口的计算,并且清空数据 print("=============================") TriggerResult.FIRE_AND_PURGE }else{ val t = triggerContext.getCurrentWatermark+(1000-(triggerContext.getCurrentWatermark%1000)) if(t<w.getEnd){ triggerContext.registerEventTimeTimer(t) } } //支触发计算 TriggerResult.FIRE } //这是系统时间的 ,不执行业务逻辑 override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { TriggerResult.CONTINUE } //每个窗口的结束时调用 override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = { val firstSeen: ValueState[Boolean] = triggerContext .getPartitionedState( new ValueStateDescriptor[Boolean]( "firstSeen", classOf[Boolean] ) ) //注销之前设置的事件 firstSeen.clear() } //每个数据调用一次 override def onElement(t: SensorReading, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { //获取分区的状态变量 var firstSeen: ValueState[Boolean] = triggerContext.getPartitionedState(new ValueStateDescriptor[Boolean]("firstSeen", Types.of[Boolean])) //每个窗口的第一个数据才会进入到里面设置回调事件的事件 if(!firstSeen.value()){ //获取当前水位线 val t = triggerContext.getCurrentWatermark+(1000-(triggerContext.getCurrentWatermark%1000)) //注册事件时间的回调事件,注册下一秒的事件 triggerContext.registerEventTimeTimer(t) //注册窗口结束时的事件 triggerContext.registerEventTimeTimer(w.getEnd) //关闭时间的注册,保证每一秒内的事件不重复注册 firstSeen.update(true) } TriggerResult.CONTINUE }
原文:https://www.cnblogs.com/mn-lily/p/14607941.html