我们可以用window函数来创建一个窗口,需要提供窗口分配器.
但有些提供了更方便的方法来创建窗口.
开窗之前必须要先使用keyby函数进行分区.
timeWindow(Time.seconds(5))
timeWindow(Time.seconds(15),Time.seconds(5))
window(EventTimeSessionWindo.withGap(Time.minutes(10)))
countWindow(5)
countWindow(10,2)
ReduceFunction 和 AggregateFunction
每来一条数据就进行计算,保持一个简单的状态
package test3
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import test2.SensorSource
object AvgTempPerWindow {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env
.addSource(new SensorSource)
stream
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
.aggregate(new AvgTempFunction)
.print()
env.execute()
}
// 平均温度值 = 总的温度值 / 温度的条数
class AvgTempFunction extends AggregateFunction[(String, Double), (String, Double, Long), (String, Double)] {
// 创建累加器
override def createAccumulator(): (String, Double, Long) = ("", 0.0, 0L)
// 每来一条数据,如何累加?
override def add(value: (String, Double), accumulator: (String, Double, Long)): (String, Double, Long) = {
(value._1, accumulator._2 + value._2, accumulator._3 + 1)
}
override def getResult(accumulator: (String, Double, Long)): (String, Double) = {
(accumulator._1, accumulator._2 / accumulator._3)
}
override def merge(a: (String, Double, Long), b: (String, Double, Long)): (String, Double, Long) = {
(a._1, a._2 + b._2, a._3 + b._3)
}
}
}
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据
ProcessWindowFunction
package test3
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import test2.SensorSource
object AvgTempPerWindowByProcessWindowFunction {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env
.addSource(new SensorSource)
stream
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
.process(new AvgTempFunc)
.print()
env.execute()
}
class AvgTempFunc extends ProcessWindowFunction[(String, Double), (String, Double), String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[(String, Double)], out: Collector[(String, Double)]): Unit = {
val size = elements.size
var sum: Double = 0.0
for (r <- elements) {
sum += r._2
}
out.collect((key, sum / size))
}
}
}
来一条数据先增量聚合,等到窗口关闭的时候在用全窗口聚合
package test3
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import test2.{SensorReading, SensorSource}
object MinMaxTempByAggregateAndProcess {
case class MinMaxTemp(id: String,
min: Double,
max: Double,
endTs: Long)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource)
stream
.keyBy(_.id)
.timeWindow(Time.seconds(5))
// 第一个参数:增量聚合,第二个参数:全窗口聚合
.aggregate(new Agg, new WindowResult)
.print()
env.execute()
}
class WindowResult extends ProcessWindowFunction[(String, Double, Double),
MinMaxTemp, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[(String, Double, Double)], out: Collector[MinMaxTemp]): Unit = {
// 迭代器中只有一个值,就是增量聚合函数发送过来的聚合结果
val minMax = elements.head
out.collect(MinMaxTemp(key, minMax._2, minMax._3, context.window.getEnd))
}
}
class Agg extends AggregateFunction[SensorReading, (String, Double, Double), (String, Double, Double)] {
override def createAccumulator(): (String, Double, Double) = {
("", Double.MaxValue, Double.MinValue)
}
override def add(value: SensorReading, accumulator: (String, Double, Double)): (String, Double, Double) = {
(value.id, value.temperature.min(accumulator._2), value.temperature.max(accumulator._3))
}
override def getResult(accumulator: (String, Double, Double)): (String, Double, Double) = accumulator
override def merge(a: (String, Double, Double), b: (String, Double, Double)): (String, Double, Double) = {
(a._1, a._2.min(b._2), a._3.max(b._3))
}
}
}
原文:https://www.cnblogs.com/andyonline/p/13363291.html