首页 > 其他 > 详细

Flink实例(六十六): Flink的任务链实操(二)

时间:2020-10-25 22:45:34      阅读:60      评论:0      收藏:0      [点我收藏+]

Operator Chains(操作链)

  • Flink出于分布式执行的目的,将operator的subtask链接在一起形成task(类似spark中的管道)。

  • 每个task在一个线程中执行。

  • 将operators链接成task是非常有效的优化:它可以减少线程与线程间的切换和数据缓冲的开销,并在降低延迟的同时提高整体吞吐量。

  • 链接的行为可以在编程API中进行指定,详情请见代码OperatorChainTest。

  • 开启操作链 和 禁用操作链的对比图(默认开启):

  • Flink默认会将多个operator进行串联,形成任务链(task chain)

  • 注意: task chain 可以理解为就是 operator chain 只是不同场景下,称呼不同。

  • 我们也可以禁用任务链,让每个operator形成一个task。

  • StreamExecutionEnvironment.disableOperatorChaining() 这个方法会禁用整条工作链

  • 操作链其实就是类似spark的pipeline管道模式,一个task可以执行同一个窄依赖中的算子操作。

  • 我们也可以细粒度的控制工作链的形成,比如调用dataStreamSource.map(...).startNewChain(),但不能使用dataStreamSource.startNewChain()

  • dataStreamSource.filter(...).map(...).startNewChain().map(...),需要注意的是,当这样写时相当于source和filter组成一条链,两个map组成一条链。

  • 即在filter和map之间断开,各自形成单独的链。

  • 代码:

package com.ronnie.flink.stream.test;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 *  开启与禁用工作链时,输出的结果不一样。
 *  当开启工作链时(默认启动),operator map1与map2 组成一个task.
 *     此时task运行时,对于hello,flink 这两条数据是:
 *     先打印 hello ---- 1 , hello->1 ---- 2
 *     后打印 flink ---- 1 , flink->1 ---- 2
 *  当禁用工作链时,operator map1与map2 分别在两个task中执行
 *     此时task运行时,对于hello,flink 这两条数据是:
 *     先打印 hello ---- 1 , flink ---- 1
 *     后打印 hello->1 ---- 2  , flink->1 ---- 2
 *
 *  注:操作链类似spark的管道,一个task执行多个的算子.
 */
public class OperatorChainTest {

    public static final String[] WORDS = new String[] {
            "hello",
            "flink",
            "spark",
            "hbase"
    };

    public static void main(String[] args) {
        // 设置执行环境, 类似spark中初始化sparkContext一样
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        // 关闭操作链..
        env.disableOperatorChaining();

        DataStreamSource<String> dataStreamSource = env.fromElements(WORDS);

        SingleOutputStreamOperator<String> pairStream = dataStreamSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                System.err.println(value + " ---- 1");
                return value + "->1";
            }
        }).map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                System.err.println(value + " ---- 2");
                return value + "->2";
            }
        });

        // 还可以控制更细粒度的任务链,比如指明从哪个operator开始形成一条新的链
        // someStream.map(...).startNewChain(),但不能使用someStream.startNewChain()。
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

http://www.manongjc.com/detail/13-plgrtuapvoblful.html

 

Flink实例(六十六): Flink的任务链实操(二)

原文:https://www.cnblogs.com/qiu-hua/p/13875067.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!