默认情况下, flink禁用检查点。
开启 checkpoint 的方式: 调用env.enableCheckpointing(n), 其中 N 是以毫秒为单位的检查点间隔。
Checkpoint 的相关参数:
流计算中再以下场景中需要保存状态:
当检查点(checkpoint) 机制启动时, 状态将在检查点中持久化来应对数据丢失以及恢复。
而状态在内部是如何表示的、状态是如何持久化到检查点中以及持久化到哪里都取决于选定的 State Backend。
Flink 在保存状态时, 支持3中存储方式:
如果没有配置其他任何内容, 系统默认将使用 MemoryStateBackend。
MemoryStateBackend
此种存储策略将数据保存在java的堆里,比如:kv的状态或者窗口操作用hash table来保存value等等。
当进行checkpoints的时候,这种策略会对状态做快照,然后将快照作为checkpoint中的一部分发送给JobManager,JM也将其保存在堆中。
Memory StateBackend可以使用异步的方式进行快照,官方也鼓励使用异步的方式,避免阻塞,现在默认就是异步。
注意点:
如果不希望异步,可以在构造的时候传入false,如下:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
此策略的限制:
适合的场景:
FsStateBackend
通过文件系统的URL来设置,如下:
当选择FsStateBackend时,会先将数据保存在任务管理器( Task Manager)的内存中。
当做checkpointing的时候,会将状态快照写入文件,保存在文件系统。
少量的元数据会保存在JM的内存中。
默认情况下,FsStateBackend配置为提供异步快照,以避免在写入状态检查点时阻塞处理管道(processing pipeline)。
可以通过将构造函数中相应的boolean标志设置为false来禁用该功能
new FsStateBackend(path, false);
适用场景:
RockDBStateBackend
通过文件系统的URL来设置, 例如:
此种方式kv state需要由rockdb数据库来管理,这是和内存或file backend最大的不同。
RocksDBStateBackend使用RocksDB数据库保存数据,这个数据库保存在TaskManager的数据目录中。
注意:RocksDB,它是一个高性能的Key-Value数据库。数据会放到先内存当中,在一定条件下触发写到磁盘文件上
在 checkpoint时, 整个 RocksDB数据库的数据会快照一份, 然后存到配置的文件系统中(一般是 hdfs)。
同时, Apache Flink将一些最小的元数据存储在 JobManager 的内存或者 Zookeeper 中(对于高可用性情况)。
RocksDB默认配置为执行异步快照
适合场景:
代码:
// 默认使用内存的方式存储状态值, 单词快照的状态上限为10MB, 使用同步方式进行快照。
env.setStateBackend(new MemeoryStateBackend(10*1024*1024, false));
// 使用 FsStateBackend的方式进行存储, 并且是同步方式进行快照
env.setStateBackend(new FsStateBackend("hdfs://namenode....", false));
try{
// 使用 RocksDBStateBackend方式存储, 并采用增量的快照方式进行存储。
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode....", true));
} catch(IOException e){
e.printStackTrace();
}
程序的运行过程中会每隔env.enableCheckpointing(5000)时间, 产生一个checkpoint快照点。
当使用 hdfs 来存储checkpoint 的快照点状态数据时,
如果程序失败, 我们重启程序时, 可以指明从哪个快照点进行恢复。
flink-1.9.1/bin/flink run -s hdfs://ronnie01:8020/data/flink-checkpoint/xxxxxxxxxxxxxxx(哈希码)/chk-xxx/ metadata -c com.ronnie.flink.test.checkPointTest flink-test.jar
代码:
package com.ronnie.flink.stream.test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.IOException;
public class CheckPointTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启动checkpoint, 并且设置多久进行一次checkpoint, 即两次checkpoint的时间间隔
env.enableCheckpointing(5000);
env.setParallelism(1);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
env.setRestartStrategy(RestartStrategies.fallBackRestart());
/* 设置 checkpoint 语义, 一般使用 exactly_once 语义。
at_least_once 一般在那里非常低的延迟场景使用。*/
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
/*
设置检查点之间的2最短时间
检查点之间的最短时间: 为确保流应用程序在检查点之间取得一些进展, 可以定义检查点之间需要经过多长时间。
如果将此值设置为例如500, 则无论检查点持续时间和检查点间隔如何, 下一个检查点将在上一个检查点完成后的500ms内启动
请注意, 这意味检查点间隔永远不会小于此参数。
*/
checkpointConfig.setMinPauseBetweenCheckpoints(500);
// 设置超时时间, 若本次checkpoint时间超时, 则放弃本次checkpoint操作
checkpointConfig.setCheckpointTimeout(60000);
/*
同一时间最多可以进行多少个checkpoint
默认情况下, 当一个检查点仍处于运行状态时, 系统不会触发另一个检查点
*/
checkpointConfig.setMaxConcurrentCheckpoints(1);
/*开启checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state
DELETE_ON_CANCELLATION:在job canceled的时候会自动删除外部的状态数据,但是如果是FAILED的状态则会保留;
RETAIN_ON_CANCELLATION:在job canceled的时候会保留状态数据
*/
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 默认使用内存的方式存储状态值。单次快照的状态上限内存为10MB, 使用同步方式进行快照。
env.setStateBackend(new MemoryStateBackend(10*1024*1024, false));
// 使用 FsStateBackend的方式进行存储, 并且是同步方式进行快照
env.setStateBackend(new FsStateBackend("hdfs://ronnie01:8020/data/flink-checkpoint",false));
try {
env.setStateBackend(new RocksDBStateBackend("hdfs://ronnie:8020/data/flink-checkpoint", true));
} catch (IOException e) {
e.printStackTrace();
}
// DataStreamSource<String> dataStreamSource = env.socketTextStream("ronnie01",9999);
//
// SingleOutputStreamOperator<Tuple2<String, Integer>> pairStream = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
// @Override
// public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// String[] split = value.split(" ");
// for (String word : split) {
// System.out.println("--------- lala --------");
// out.collect(new Tuple2<String, Integer>(word, 1));
// }
// }
// });
//
//
// KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = pairStream.keyBy(0);
//
//
// SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyedStream.sum(1);
//
//
// sum.print();
//
//
// try {
// //转换算子都是懒执行的,最后要显示调用 执行程序,
// env.execute("checkpoint-test");
// } catch (Exception e) {
// e.printStackTrace();
// }
}
}
Flink的Savepoints与Checkpoints的不同之处在于备份与传统数据库系统中的恢复日志不同。
检查点的主要目的是在job意外失败时提供恢复机制。
Checkpoint的生命周期由Flink管理,即Flink创建,拥有和发布Checkpoint - 无需用户交互。
作为一种恢复和定期触发的方法,Checkpoint主要的设计目标是:
与此相反,Savepoints由用户创建,拥有和删除。
他们一般是有计划的进行手动备份和恢复。
例如,在Flink版本需要更新的时候,或者更改你的流处理逻辑,更改并行性等等。
在这种情况下,我们往往关闭一下流,这就需要我们将流中的状态进行存储,后面重新部署job的时候进行会。
从概念上讲,Savepoints的生成和恢复成本可能更高,并且更多地关注可移植性和对前面提到的作业更改的支持。
使用:
命令:
flink savepoint jobID target_directory
保存当前流的状态到指定目录:
bin/flink savepoint xxxxxxxx(哈希码) hdfs://ronnie01:8020/data/flink/savepoint
重启, 恢复数据流:
flink-1.9.1/bin/flink run -s hdfs://ronnie01:8020/data/flink/savepoint/savepoint-xxxxx-xxxxxxxxx -c com.ronnie.flink.stream.test.CheckPointTest flink-test.jar
原文:https://www.cnblogs.com/ronnieyuan/p/11852116.html