首页 > 其他 > 详细

FLINK-状态管理-状态使用

时间:2021-05-07 14:41:41      阅读:15      评论:0      收藏:0      [点我收藏+]

1.ValueState:

private var lastTempState: ValueState[Double] = _

override def open(parameters: Configuration): Unit = {
  val lastTempDescriptor = new ValueStateDescriptor[Double]("lastTemp", classOf[Double])
  lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor)
}

2.ListState:

private var itemState : ListState[ItemViewCount] = _

override def open(parameters: Configuration): Unit = {
  //命名状态变量的名字和类型
  val itemStateDescription: ListStateDescriptor[ItemViewCount] = new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount])
  itemState = getRuntimeContext.getListState(itemStateDescription)
}

Flink提供了三种基于key/value的state接口,ListState接口适用于缓存多个值的计算。具体实现之前,因为state必须是基于key,且必须获取getRuntimeContext,state必须同时满足两个条件:

  • 直接基于keyedStream或者由keyedStream转换来的windowedStream
  • 必须继承RichFunction

实际实现时候,因为windowedStream在scala中不能实现RichWindowFunction,因此在main中使用flatmap间接实现了windowFunction中的功能:

val fromTransactionDataStream = watermarkTransaction
      .keyBy(_.code)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      
val transaction = fromTransactionDataStream
      .apply(new StockTransactionApply)
      .keyBy(_._3)
      .flatMap(new TransactionStateFlatMapFunction)

3.MapState

private var behaviorMapState: MapState[String, Int] = _

override def open(parameters: Configuration): Unit = {
  // 创建StateDescriptor
  val behaviorMapStateDescriptor = new MapStateDescriptor[String, Int]("behaviorMap", classOf[String], classOf[Int])
  // 通过StateDescriptor获取运行时上下文中的状态
  behaviorMapState = getRuntimeContext.getMapState(behaviorMapStateDescriptor)
}

 

FLINK-状态管理-状态使用

原文:https://www.cnblogs.com/mn-lily/p/14738626.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!