首页 > 编程语言 > 详细

java flink之eventTime和window

时间:2020-05-22 09:46:42      阅读:138      评论:0      收藏:0      [点我收藏+]
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.*;
import org.apache.flink.streaming.api.functions.timestamps.*;
import org.apache.flink.streaming.api.windowing.assigners.*;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.omg.PortableInterceptor.SYSTEM_EXCEPTION;


/**
 * # _*_ coding:utf-8 _*_
 * # Author:xiaoshubiao
 * # Time : 2020/5/21 14:37
 **/
public class event_watermaker {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource<String> localhost = executionEnvironment.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<String> stringSingleOutputStreamOperator = localhost.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(0)) {
                    @Override
                    public long extractTimestamp(String s) {
                        System.out.println("event_time输出" +s);
                        return Integer.valueOf(s.split(" ")[0]);
                    }
                }
        );
        KeyedStream<Tuple2<String, Integer>, Tuple> tuple2TupleKeyedStream = stringSingleOutputStreamOperator.map(
                new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String s) throws Exception {
                        System.out.println("map转换输出" + s);
                        return new Tuple2<>(s.split(" ")[1], 1);
                    }
                }
        ).keyBy(0);
        tuple2TupleKeyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5))).reduce(
                new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t0, Tuple2<String, Integer> t1) throws Exception {

                        return new Tuple2<String, Integer>(t0.f0,t0.f1+t1.f1);
                    }
                }
        ).print();
        executionEnvironment.execute();
    }
}

 

java flink之eventTime和window

原文:https://www.cnblogs.com/7749ha/p/12935310.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!