首页 > 其他 > 详细

Flink状态之OperatorState

时间:2021-01-04 11:57:29      阅读:62      评论:0      收藏:0      [点我收藏+]

1、主类

package com.example.demo.flink;

import com.example.demo.flink.impl.CustomSink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**
 * @program: demo
 * @description: valuestate
 * @author: yang
 * @create: 2020-12-28 15:46
 */
public class TestOperatorStateMain {
    public static void main(String[] args) throws  Exception{
        //获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        //StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(16);
        //获取数据源
        DataStreamSource<Tuple2<String, Integer>> dataStreamSource =
                env.fromElements(
                        Tuple2.of("Spark", 3),
                        Tuple2.of("Flink", 5),
                        Tuple2.of("Hadoop", 7),
                        Tuple2.of("Spark", 4));

        // 输出:
        //(1,5.0)
        //(2,4.0)
        dataStreamSource.addSink(new CustomSink(3)).setParallelism(1);
        env.execute("TestStatefulApi");

    }

}

2、处理实现类

package com.example.demo.flink.impl;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.List;

/**
 * @program: demo
 * @description:
 * @author: yang
 * @create: 2020-12-29 11:36
 */
public class CustomSink implements SinkFunction<Tuple2<String,Integer>>, CheckpointedFunction {
    private int threshold;

    private List<Tuple2<String,Integer>> bufferElements;

    private ListState<Tuple2<String,Integer>> checkpointState;

    public CustomSink(int i) {
        this.threshold = i;
        this.bufferElements = new ArrayList<>();
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        //设置快照
        checkpointState.clear();
        for(Tuple2<String,Integer> ele : bufferElements){
            checkpointState.add(ele);
        }
    }


    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor descriptor = new ListStateDescriptor<Tuple2<String,Integer>>("Operator", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
        }));
        //将checkpoint中的数据加载进内存
        checkpointState = context.getOperatorStateStore().getListState(descriptor);
        if(context.isRestored()){
            for (Tuple2<String,Integer> ele: checkpointState.get()) {
                bufferElements.add(ele);
            }
        }
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        bufferElements.add(value);
        if(bufferElements.size() == threshold){
            System.out.println("自定义格式:" + bufferElements);
            bufferElements.clear();
        }
    }
}

 

Flink状态之OperatorState

原文:https://www.cnblogs.com/ywjfx/p/14228605.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!