首页 > 其他 > 详细

Flink Streaming状态处理(Working with State)

时间:2020-02-15 10:07:04      阅读:82      评论:0      收藏:0      [点我收藏+]

参考来源:

https://www.jianshu.com/p/6ed0ef5e2b74

https://blog.csdn.net/Fenggms/article/details/102855159

最近一直在看Flink,现在了解一下Flink的状态管理中的几种状态吧。

一、键State和操作State(Keyed State and Opetator State)

Flink中有两种基本的状态:键状态(Keyed State)和操作状态(Operator State)。 键状态(Keyed State) 键状态(Keyed State)是与键相关的,只能在 KeyedStream的函数和操作符中使用。 可以把键状态(Keyed State)理解成已经分区的操作状态(Operator State)或者分片,每个键只有一个状态分区。每个键状态(Keyed State)在 逻辑上绑定到《parallel
-operator-instance,key》的唯一组合,由于每个键是键操作符的一个并行实例,可以将其简单的理解为<operator,key>。 键状态(keyed state)进一步组合成所谓的键组(key groups),键组(key groups)是Flink重新分配 键状态(keyed state)的原子单元; 键组(key groups)的数量与定义的最大并行度完全相同。在执行过程中,每个键操作符的并行实例都使用一个或多个键组的键。 操作符状态(Operator State) 对于操作符状态(Operator State),每个操作符状态(Operator State)都绑定一个并行操作符实例。 Kafka Connector是在Flink中使用操作符状态(Operator State)一个很好的例子。Kafka消费者的每个并行实例都维护一个 topic分区和偏移量(offset)的映射作为其操作符状态(Operator State)。 当并行度发生改变时,操作符状态(Operator State)接口支持在并行操作符实例之间重新分布状态。

 

二、原始和管理状态(Raw and Managed State)

键状态(Keyed State)和操作符状态(Operator State)有两种形式:管理状态和原始状态。

管理状态(Managed State)表示在Flink运行时约束的数据结构,比如 内部的哈希表或者 RockDB。例如:valueState,ListState。

Flink在运行时对状态进行编码,并将其写入检查点(checkpoint)。

原始状态(Raw State)是状态操作符保存在自己的数据结构中。当触发检查点时,它们只将字节序列写入检查点。

Flink不知道状态的数据结构,只看到原始字节。

Managed State是Flink自动管理的State,而 Raw State 是原生态State,两者的区别如下:

从状态管理方式的方式来说,ManagedState 由Flink Runtime管理,自动存储,自动恢复,在内存管理上有优化,而Raw State需要用户

自己管理,需要自己序列化,Flink不知道State中存入的数据是什么结构,只有用户自己知道,需要最终序列化为可存储的数据结构。

从状态数据结构来说,ManagedState 支持已知的数据结构,如Value、List、Map等。而Raw State只支持字节数组,所有状态都要转化为二进制

字节数组才可以。

从推荐使用场景来说,ManagedState大多数情况下均可使用,而RawState是当ManagedState不够用时,比如需要自定义Operator时,推荐使用

RawState。

所有dataStream函数都可以使用管理状态,但是原始状态接口只能在实现操作符时使用。建议使用管理状态(而不是原始状态),

因为使用管理状态,Flink能够在并行度改变时自动重新分发状态,并且更好的内存管理。

使用管理键状态(Using Managed Keyed State)

管理键状态接口提供不同类型状态的访问,这些状态的作用域都是当前输入元素的键。这意味着这种类型的状态只能在

keyedStream上使用,可以通过 stream.keyBy()创建。

 

技术分享图片

 

 

技术分享图片

 

三、几种Keyed State的差异具体体现在:

ValueState 存储单个值,比如 wordcount,用word当key,state就是它的count。这里面的单个值可能是数值或者是字符串,作为单个值,访问接口可能有

两种,get和set。在state上体现的是 update(T)/T value()

MapState的状态数据类型是Map,在State上有put、remove等。需要注意的是在 MapState中的key和keyed state中的key不是同一个。

ListState状态数据类型是List,访问接口如 add、update等。

ReducingState和AggregatingState与ListState都是同一个父类,但状态数据类型上是单个值,原因在于其中的add方法不是把当前的元素追加到

列表中,而是把当前元素直接更新进了Reducing的结果中。

AggregatingSttate的区别是在访问接口,ReducingState中 add(T)和T get() 进去和出来的元素都是同一个类型,但在 AggregatingState输入

的 IN,输出的是 OUT。

 

容错机制与故障恢复:

技术分享图片

 

 

 

 

四、分布式快照 Checkpoint 完成后,当作业发生故障了如何去恢复?
假如作业分布跑在 3 台机器上,其中一台挂了。这个时候需要把进程或者线程移到 active 的 2 台机器上,此时还需要将整个作业的所有 Task 都回滚到最后一次成功 Checkpoint 中的状态,然后从该点开始继续处理。
如果要从 Checkpoint 恢复,必要条件是数据源需要支持数据重新发送。Checkpoint 恢复后, Flink 提供两种一致性语义,一种是恰好一次,一种是至少一次。
在做 Checkpoint 时,可根据 Barries 对齐来判断是恰好一次还是至少一次,如果对齐,则为恰好一次,否则没有对齐即为至少一次。如果作业是单线程处理,
也就是说 Barries 是不需要对齐的;如果只有一个 Checkpoint 在做,不管什么时候从 Checkpoint 恢复,都会恢复到刚才的状态;如果有多个节点,假如一个数据的 Barries 到了,另一个 Barries 还没有来,内存中的状态如果已经存储。那么这
2 个流是不对齐的,恢复的时候其中一个流可能会有重复。 Checkpoint 通过代码的实现方法如下: 首先从作业的运行环境 env.enableCheckpointing 传入 1000,意思是做 2 个 Checkpoint 的事件间隔为 1 秒。Checkpoint 做的越频繁,恢复时追数据就会相对减少,同时 Checkpoint 相应的也会有一些 IO 消耗。 接下来是设置 Checkpoint 的 model,即设置了 Exactly_Once 语义,并且需要 Barries 对齐,这样可以保证消息不会丢失也不会重复。 setMinPauseBetweenCheckpoints 是 2 个 Checkpoint 之间最少是要等 500ms,也就是刚做完一个 Checkpoint。比如某个 Checkpoint 做了 700ms,按照原则过 300ms 应该是做下一个 Checkpoint,因为设置了 1000ms 做一次 Checkpoint 的,但是中间的等待时间比较短,不足 500ms 了,需要多等 200ms,因此以这样的方式防止 Checkpoint 太过于频繁而导致业务处理的速度下降。 setCheckpointTimeout 表示做 Checkpoint 多久超时,如果 Checkpoint 在 1min 之内尚未完成,说明 Checkpoint 超时失败。 setMaxConcurrentCheckpoints 表示同时有多少个 Checkpoint 在做快照,这个可以根据具体需求去做设置。 enableExternalizedCheckpoints 表示下 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint 会在整个作业 Cancel 时被删除。Checkpoint 是作业级别的保存点。 上面讲过,除了故障恢复之外,还需要可以手动去调整并发重新分配这些状态。手动调整并发,必须要重启作业并会提示 Checkpoint 已经不存在,那么作业如何恢复数据? 一方面 Flink 在 Cancel 时允许在外部介质保留 Checkpoint ;另一方面,Flink 还有另外一个机制是 SavePoint。
五、Savepoint 与 Checkpoint 类似,同样是把状态存储到外部介质。当作业失败时,可以从外部恢复。Savepoint 与 Checkpoint 有什么区别呢?

从触发管理方式来讲,Checkpoint 由 Flink 自动触发并管理,而 Savepoint 由用户手动触发并人肉管理;
从用途来讲,Checkpoint 在 Task 发生异常时快速恢复,例如网络抖动或超时异常,而 Savepoint 有计划地进行备份,使作业能停止后再恢复,例如修改代码、调整并发;
从特点来讲,Checkpoint 比较轻量级,作业出现问题会自动从故障中恢复,在作业停止后默认清除;而 Savepoint 比较持久,以标准格式存储,允许代码或配置发生改变,
恢复需要启动作业手动指定一个路径恢复。

 

六、可选的状态存储方式
第一种

 技术分享图片

 

Checkpoint 的存储,第一种是内存存储,即 MemoryStateBackend,构造方法是设置最大的 StateSize,选择是否做异步快照,这种存储状态本身存储在 TaskManager 节点也就是执行节点内存中的,因为内存有容量限制,所以单个 State maxStateSize 默认 5 M,且需要注意 maxStateSize <= akka.framesize 默认 10 M。Checkpoint 存储在 JobManager 内存中,因此总大小不超过 JobManager 的内存。推荐使用的场景为:本地测试、几乎无状态的作业,比如 ETL、JobManager 不容易挂,或挂掉影响不大的情况。不推荐在生产场景使用。

 

第二种
技术分享图片

另一种就是在文件系统上的 FsStateBackend ,构建方法是需要传一个文件路径和是否异步快照。State 依然在 TaskManager 内存中,但不会像 MemoryStateBackend 有 5 M 的设置上限,Checkpoint 存储在外部文件系统(本地或 HDFS),打破了总大小 Jobmanager 内存的限制。容量限制上,单 TaskManager 上 State 总量不超过它的内存,总大小不超过配置的文件系统容量。推荐使用的场景、常规使用状态的作业、例如分钟级窗口聚合或 join、需要开启 HA 的作业。

第三种:
技术分享图片

 

还有一种存储为 RocksDBStateBackend ,RocksDB 是一个 key/value 的内存存储系统,和其他的 key/value 一样,先将状态放到内存中,如果内存快满时,则写入到磁盘中,但需要注意 RocksDB 不支持同步的 Checkpoint,构造方法中没有同步快照这个选项。不过 RocksDB 支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味着每次用户不需要将所有状态都写进去,将增量的改变的状态写进去即可。它的 Checkpoint 存储在外部文件系统(本地或 HDFS),其容量限制只要单个 TaskManager 上 State 总量不超过它的内存 + 磁盘,单 Key 最大 2G,总大小不超过配置的文件系统容量即可。推荐使用的场景为:超大状态的作业,例如天级窗口聚合、需要开启 HA 的作业、最好是对状态读写性能要求不高的作业。

 

 

Flink Streaming状态处理(Working with State)

原文:https://www.cnblogs.com/ssqq5200936/p/12309758.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!