知识点:
env.setStateBackend( new FsStateBackend("hdfs://uat-datacenter1:8020/flink/checkpoint")); env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
1、主类
package com.example.demo.flink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.concurrent.TimeUnit; /** * @program: demo * @description: * @author: yang * @create: 2020-12-29 14:14 */ public class TestCheckpoint { public static void main(String[] args) throws Exception { // ParameterTool parameterTool = ParameterTool.fromArgs(args); // String hostname = parameterTool.get("hostname"); // int port = parameterTool.getInt("port"); String hostname = "uat-datacenter2"; int port = 5000; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend( new FsStateBackend("hdfs://uat-datacenter1:8020/flink/checkpoint")); env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); DataStreamSource<String> dataStreamSource = env.socketTextStream(hostname, port); SingleOutputStreamOperator<String> result = dataStreamSource.map(new MapFunction<String, String>() { @Override public String map(String s) throws Exception { return "hs_" + s; } }).uid("split-map"); result.print().uid("print-operator"); env.execute("test"); } }
原文:https://www.cnblogs.com/ywjfx/p/14228615.html