object StateTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val socketStream = env.socketTextStream("hadoop102", 7777)
val dataStream: DataStream[SensorReading] = socketStream.map(d => {
val arr = d.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000
})
//温度变动超过10度报警
val processStream = dataStream.keyBy(_.id)
.process(new TempChangeAlert(10.0))
dataStream.print("data stream")
processStream.print("alert stream")
env.execute("test")
}
}
class TempChangeAlert(threshold: Double) extends KeyedProcessFunction[String, SensorReading, String] {
//维护一个状态
lazy val lastTemp = getRuntimeContext.getState(new ValueStateDescriptor[Double]("tempState", classOf[Double]))
override def processElement(value: SensorReading,
ctx: KeyedProcessFunction[String, SensorReading, String]#Context,
out: Collector[String]): Unit = {
//取出上一个温度
val lastTemperature = lastTemp.value()
val diff = (lastTemperature - value.temperature).abs
if (diff > threshold) {
out.collect(value.id + "," + lastTemperature + "," + value.temperature)
}
lastTemp.update(value.temperature)
}
}
如果用不到ProcessFunction中的时间服务等内容,可以简单使用富函数实现同样的功能。关键代码如下
//温度变动超过10度报警 val processStream = dataStream.keyBy(_.id) .flatMap(new TempChangeAlert2(10.0))
自定义类继承富函数类
class TempChangeAlert2(threshold:Double) extends RichFlatMapFunction[SensorReading,(String,Double,Double)]{
private var lastTemp: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
lastTemp = getRuntimeContext.getState(new ValueStateDescriptor[Double]("tempState2", classOf[Double]))
}
override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
//取出上一个温度
val lastTemperature = lastTemp.value()
val diff = (lastTemperature - value.temperature).abs
if (diff > threshold) {
out.collect((value.id,lastTemperature,value.temperature))
}
lastTemp.update(value.temperature)
}
}
原文:https://www.cnblogs.com/noyouth/p/12905327.html