Savepoints 是检查点的一种特殊实现,底层实现其实也是使用 Checkpoints 的机制。
Savepoints 是用户以手工命令的方式触发 Checkpoint,并将结果持久化到指定的存储路径中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况,从而无法实现从端到端的 Excatly-Once 语义保证。
在 flink-conf.yaml 中配置 SavePoint 存储的位置,设置后,如果要创建指定 Job 的SavePoint,可以不用在手动执行命令时指定 SavePoint 的位置。
state.savepoints.dir: hdfs:/node2/savepoints
为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员通过手动给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。而这些自动生成的 ID 依赖于程序的结构,并且对代码的更改是很敏感的。因此,强烈建议用户手动设置 ID。
import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object TestSaveByHdfs { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 默认并行度给所有算子适用,并行度<= slot数量 env.setParallelism(1) // 3. 读取数据 sock流中的数据 // DataStream 相当于spark中的Dstream val stream: DataStream[String] = env.socketTextStream("node1", 8888) .uid("stream-001") // 4. 转换和处理数据 val result: DataStream[(String, Int)] = stream.flatMap(_.split(" ")) .uid("flatMap-001") .map((_, 1)).uid("map-001") .keyBy(0) // 分组算子,0或者1 代表下标,0代表单词,1代表单词出现的次数 .sum(1).uid("sum-001") // 聚会累加算子 result.print("结果") env.execute("wordcount") } }
//先启动Job
[root@node1 bin]# ./flink run -c com.it.flink.state.TestSavepoints -d /home/Flink-Demo-1.0-SNAPSHOT.jar
//再取消Job ,触发SavePoint
[root@node1 bin]# ./flink savepoint 6ecb8cfda5a5200016ca6b01260b94ce
[root@node1 bin]# ./flink cancel 6ecb8cfda5a5200016ca6b01260b94ce
[root@hadoop101 bin]# ./flink run -s hdfs://node2/savepoints/savepoint-6ecb8c-e56ccb88576a -c com.it.flink.state.TestSavepoints -d /home/Flink-Demo-1.0-SNAPSHOT.jar
也可以通过Flink webUI启动job
原文:https://www.cnblogs.com/yj2434/p/14057893.html