<dependency>
????? <groupId>org.apache.flink</groupId>
?????? <artifactId>flink-table-planner_2.11</artifactId>
?????? <version>1.8.0</version>
?????? <scope>provided</scope>
?????</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
<scope>provided</scope>
</dependency>
flink-table-api-java-bridge_2.11-1.8.0.jar:使用java编程语言对于DataStream/DataSet table & sql API的支持
flink-table-planner_2.11-1.8.0.jar:封装了关于table和sql的操作,将算子转换成可执行的flink job。pom文件中默认引入了flink-table-api-java-bridge_2.11-1.8.0.jar以及flink基础的jar。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.8.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>scala-reflect</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>scala-compiler</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>scala-reflect</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>scala-compiler</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>2.7.9-6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.8.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>1.8.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>flink-test-utils-junit</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>curator-test</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests_2.11</artifactId>
<version>1.8.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.8.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.8.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>frocksdbjni</artifactId>
<groupId>com.data-artisans</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.8.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.java.Tumble;
public class StreamTableDemo {
?????public static void main(String[] args) throws Exception {
?????????StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
???????? StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
????????
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
????????
???????? DataStreamSource<Tuple5<String, String, Integer, Integer, Long>>? dataStreamSource = env.addSource(new SourceFunction<Tuple5<String, String, Integer,Integer,Long>>() {
???????? ?? Random random = new Random();
???????? ?? int i=0;
????????????????@Override
????????????????public void run(SourceContext<Tuple5<String, String, Integer, Integer, Long>> ctx) throws Exception {
?????????????????????while (true) {
???????????????????????????i++;
???????????????????????????String name = "name-" + i;
???????????????????????????String sex = random.nextBoolean() == true ? "male" : "female";
???????????????????????????int age = random.nextInt(3) + 15;
???????????????????????????int score = random.nextInt(100) + 1;
???????????????????????????Long currentTime = System.currentTimeMillis();
???????????????????????????ctx.collect(new Tuple5(name, sex, age,score,currentTime));
???????????????????????????TimeUnit.SECONDS.sleep(1);
?????????????????????}
????????????????}
????????????????@Override
????????????????public void cancel() {???????????????????
????????????????}
???????? }, "source");
???????? tEnv.registerFunction("utc2local", new ScalarFunctionImpl());
???????? tEnv.registerDataStream("students", dataStreamSource, "name, sex, age, score, createTime, processingTime.proctime");
//?????? tEnv.registerTableSource("students", new StreamSourceTableTest());
// Table table = tEnv.sqlQuery("select sex," +
"utc2local(TUMBLE_START(processingTime, INTERVAL '10' MINUTE)) as wStart," +
"utc2local(TUMBLE_END(processingTime, INTERVAL '10' MINUTE)) as wEnd," +
"count(sex) FROM students " +
"GROUP BY TUMBLE(processingTime, INTERVAL '10' MINUTE) , sex");
???????? Table table = tEnv.scan("students")
???????? ??????????????????.where("score > 90")
?????????????????????????? .window(Tumble.over("10.minutes").on("processingTme").as("w"))
???????????????? ? .window(Over.partitionBy("school").orderBy("proctime").preceding("1.minutes").following("current_range").as("w"))
???????? ??????????????????.groupBy("w, sex")
???????? ??????????????????.select("sex, utc2local(w.start), utc2local(w.end), sex.count");
//?????? table.writeToSink(new CsvTableSink(sinkPath, ",", 1, WriteMode.NO_OVERWRITE));
???????? TypeInformation[] types = new TypeInformation[] {Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG};
???????? tEnv.toAppendStream(table, Types.ROW(types)).print();
???????? env.execute("table-stream-window-demo");
?????}
}
//TODO 未完待续
//TODO 未完待续
原文:https://www.cnblogs.com/guogai1949/p/12160540.html