Returns a simple Cartesian product restricted by the join condition and a time constraint. An interval join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Two appropriate range predicates can define such a condition (<, <=, >=, >), a BETWEEN predicate, or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables.
For example, this query will join all orders with their corresponding shipments if the order was shipped four hours after the order was received.
SELECT * FROM Orders o, Shipments s WHERE o.id = s.order_id AND o.order_time BETWEEN s.ship_time - INTERVAL ‘4‘ HOUR AND s.ship_time
The following predicates are examples of valid interval join conditions:
ltime = rtime ltime >= rtime AND ltime < rtime + INTERVAL ‘10‘ MINUTE ltime BETWEEN rtime - INTERVAL ‘10‘ SECOND AND rtime + INTERVAL ‘5‘ SECOND
For streaming queries, compared to the regular join, interval join only supports append-only tables with time attributes. Since time attributes are quasi-monotonic increasing, Flink can remove old values from its state without affecting the correctness of the result.
相对于 regular join,interval Join 则利用窗口的给两个输入表设定一个 Join 的时间界限,超出时间范围的数据则对 join 不可见并可以被清理掉,这样就能修正 regular join 因为没有剔除数据策略带来 join 结果的误差以及需要大量的资源。但是使用interval join,需要定义好时间属性字段,可以是计算发生的 Processing Time,也可以是根据数据本身提取的 Event Time;如果是定义的是 Processing Time,则Flink 框架本身根据系统划分的时间窗口定时清理数据;如果定义的是 Event Time,Flink 框架分配 Event Time 窗口并根据设置的 watermark 来清理数据。而在前面的数据准备中,我们根据点击流和曝光流提取实践时间属性字段,并且设置了允许 5 分钟乱序的 watermark。目前 Interval join 已经支持 inner ,left outer, right outer , full outer 等类型的 join。因此,interval join 只需要缓存时间边界内的数据,存储空间占用小,计算更为准确的实时 join 结果。
--写法1 SELECT columns FROM t1 [AS <alias1>] [LEFT/INNER/FULL OUTER] JOIN t2 ON t1.column1 = t2.key-name1 AND t1.timestamp BETWEEN t2.timestamp AND BETWEEN t2.timestamp + + INTERVAL ‘10‘ MINUTE;
--写法2 SELECT columns FROM t1 [AS <alias1>] [LEFT/INNER/FULL OUTER] JOIN t2 ON t1.column1 = t2.key-name1 AND t2.timestamp <= t1.timestamp and t1.timestamp <= t2.timestamp + + INTERVAL ’10‘ MINUTE ;
如何设置边界条件
right.timestamp ∈ [left.timestamp + lowerBound, left.timestamp + upperBound]
Flink基础(125):FLINK-SQL语法 (19) DQL(11) OPERATIONS(8) Joins(2) Interval Joins
原文:https://www.cnblogs.com/qiu-hua/p/15195661.html