首页 > 其他 > 详细

8 基于时间和窗口操作符

时间:2020-07-23 15:38:02      阅读:93      评论:0      收藏:0      [点我收藏+]

一 水位线

水位线 = 到达最大时间 - 延时时间
水位线是真正的事件时间

二 设置事件事件

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

如果要使用 processing time,将 TimeCharacteristic.EventTime 替换为 TimeCharacteristic.ProcessingTIme 就可以了
使用事件时间的流必须要带水位线,而且要指定数据源中的时间戳

三 指定时间戳

assignAscendingTimestamps(_._2) //指定时间戳为_._2,延时时间为0
assignTimestampsAndWatermarks(
        // 最大延迟时间设置为5s
        new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
          override def extractTimestamp(element: (String, Long)): Long = element._2 //设置时间戳为element._2
        }
      )

package test4

import java.lang
import java.sql.Timestamp

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

// nc -lk 9999
// 先使用默认的插入频率,200ms插入一次
// 将`env.getConfig.setAutoWatermarkInterval(60000)`注释掉
//a 1
//a 2
//a 15
//a 1
//a 2
//a 17
//a 12
//a 11
//a 12
//a 13
//a 25
// 添加`env.getConfig.setAutoWatermarkInterval(60000)`
//a 1
//a 2
//a 100
//a 5
//a 8
//a 3
//a 50
//a 51
//a 52
object WatermarkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 应用程序使用事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    // 系统每隔一分钟的机器时间插入一次水位线
    env.getConfig.setAutoWatermarkInterval(60000)

    val stream = env
      .socketTextStream("localhost", 9999, ‘\n‘)
      .map(line => {
        val arr = line.split(" ")
        // 第二个元素是时间戳,必须转换成毫秒单位
        (arr(0), arr(1).toLong * 1000)
      })
      // 抽取时间戳和插入水位线
      // 插入水位线的操作一定要紧跟source算子
      .assignTimestampsAndWatermarks(
        // 最大延迟时间设置为5s
        new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
          override def extractTimestamp(element: (String, Long)): Long = element._2
        }
      )
      .keyBy(_._1)
      // 10s的滚动窗口
      .timeWindow(Time.seconds(10))
      .process(new MyProcess)

    stream.print()

    env.execute()
  }

  class MyProcess extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit =
      out.collect("窗口结束时间为:" + new Timestamp(context.window.getEnd) + " 的窗口中共有 " + elements.size + " 条数据")
  }
}

四 设置每隔多少时间插入水位线

env.getConfig.setAutoWatermarkInterval(60000)

package test4

import test2.{SensorReading, SensorSource}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object PeriodicInsertWatermarks {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val stream = env
      .addSource(new SensorSource)
      .assignTimestampsAndWatermarks(
        new MyAssigner
      )
      .keyBy(_.id)
      .timeWindow(Time.seconds(5))
      .process(new MyProcess)

    stream.print()
    env.execute()
  }

  // `BoundedOutOfOrdernessTimestampExtractor`的底层实现
  class MyAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {
    val bound: Long = 1000L // 最大延迟时间
    var maxTs: Long = Long.MinValue + bound // 观察到的最大时间戳

    // 每来一条元素就要调用一次
    override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
      maxTs = maxTs.max(element.timestamp)
      element.timestamp
    }

    // 产生水位线的函数,默认200ms调用一次
    override def getCurrentWatermark: Watermark = {
      // 水位线 = 观察到的最大时间戳 - 最大延迟时间
      new Watermark(maxTs - bound)
    }
  }

  class MyProcess extends ProcessWindowFunction[SensorReading, String, String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[SensorReading], out: Collector[String]): Unit = {
      out.collect(elements.size.toString)
    }
  }
}

五 产生不规则水位线

有时候输入流中会包含一些用于指示系统进度的特殊元组或标记。 Flink 为此类情形以及可
根据输入元素生成水位线的情形提供了 AssignerWithPunctuatedWatermarks 接口。该接
口中的 checkAndGetNextWatermark() 方法会在针对每个事件的 extractTimestamp()
法后立即调用。它可以决定是否生成一个新的水位线。如果该方法返回一个非空、且大于之
前值的水位线,算子就会将这个新水位线发出。

class PunctuatedAssigner
 extends AssignerWithPunctuatedWatermarks[SensorReading] {
 val bound: Long = 60 * 1000

// 每来一条数据就调用一次
 override def checkAndGetNextWatermark(r: SensorReading,
 extractedTS: Long): Watermark = {
 if (r.id == "sensor_1") {
 // 抽取的时间戳 - 最大延迟时间
 new Watermark(extractedTS - bound)
 } else {
 null
 }
 }

 // 每来一条数据就调用一次
 override def extractTimestamp(r: SensorReading,
 previousTS: Long): Long = {
 r.timestamp
 }
}

8 基于时间和窗口操作符

原文:https://www.cnblogs.com/andyonline/p/13364203.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!