首页 > 其他 > 详细

Flink状态保存CheckPoint

时间:2021-01-04 11:50:43      阅读:19      评论:0      收藏:0      [点我收藏+]

知识点:

        env.setStateBackend( new FsStateBackend("hdfs://uat-datacenter1:8020/flink/checkpoint"));

        env.enableCheckpointing(5000);

        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

1、主类

package com.example.demo.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.TimeUnit;

/**
 * @program: demo
 * @description:
 * @author: yang
 * @create: 2020-12-29 14:14
 */
public class TestCheckpoint {

    public static void main(String[] args) throws Exception {
//        ParameterTool parameterTool = ParameterTool.fromArgs(args);
//        String hostname = parameterTool.get("hostname");
//        int port = parameterTool.getInt("port");

        String hostname = "uat-datacenter2";
        int port = 5000;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStateBackend( new FsStateBackend("hdfs://uat-datacenter1:8020/flink/checkpoint"));

        env.enableCheckpointing(5000);

        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        DataStreamSource<String> dataStreamSource = env.socketTextStream(hostname, port);

        SingleOutputStreamOperator<String> result = dataStreamSource.map(new MapFunction<String, String>() {

            @Override
            public String map(String s) throws Exception {
                return "hs_" + s;
            }
        }).uid("split-map");

        result.print().uid("print-operator");

        env.execute("test");
    }
}

 

Flink状态保存CheckPoint

原文:https://www.cnblogs.com/ywjfx/p/14228615.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!