1.ValueState:
private var lastTempState: ValueState[Double] = _ override def open(parameters: Configuration): Unit = { val lastTempDescriptor = new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor) }
2.ListState:
private var itemState : ListState[ItemViewCount] = _ override def open(parameters: Configuration): Unit = { //命名状态变量的名字和类型 val itemStateDescription: ListStateDescriptor[ItemViewCount] = new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount]) itemState = getRuntimeContext.getListState(itemStateDescription) }
Flink提供了三种基于key/value的state接口,ListState接口适用于缓存多个值的计算。具体实现之前,因为state必须是基于key,且必须获取getRuntimeContext,state必须同时满足两个条件:
实际实现时候,因为windowedStream在scala中不能实现RichWindowFunction,因此在main中使用flatmap间接实现了windowFunction中的功能:
val fromTransactionDataStream = watermarkTransaction .keyBy(_.code) .window(TumblingEventTimeWindows.of(Time.seconds(10))) val transaction = fromTransactionDataStream .apply(new StockTransactionApply) .keyBy(_._3) .flatMap(new TransactionStateFlatMapFunction)
3.MapState
private var behaviorMapState: MapState[String, Int] = _ override def open(parameters: Configuration): Unit = { // 创建StateDescriptor val behaviorMapStateDescriptor = new MapStateDescriptor[String, Int]("behaviorMap", classOf[String], classOf[Int]) // 通过StateDescriptor获取运行时上下文中的状态 behaviorMapState = getRuntimeContext.getMapState(behaviorMapStateDescriptor) }
原文:https://www.cnblogs.com/mn-lily/p/14738626.html