首页 > 数据库技术 > 详细

flink table sql两种写法

时间:2020-01-07 12:59:58      阅读:129      评论:0      收藏:0      [点我收藏+]

table & sql api

maven依赖
     <dependency>
?????  <groupId>org.apache.flink</groupId>
?????? <artifactId>flink-table-planner_2.11</artifactId>
?????? <version>1.8.0</version>
?????? <scope>provided</scope>
?????</dependency>
     
     <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.11</artifactId>
        <version>1.8.0</version>
        <scope>provided</scope>
     </dependency>

flink-table-api-java-bridge_2.11-1.8.0.jar:使用java编程语言对于DataStream/DataSet table & sql API的支持
flink-table-planner_2.11-1.8.0.jar:封装了关于table和sql的操作,将算子转换成可执行的flink job。pom文件中默认引入了flink-table-api-java-bridge_2.11-1.8.0.jar以及flink基础的jar。

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java</artifactId>
      <version>1.8.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.8.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.8.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>1.8.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_2.11</artifactId>
      <version>1.8.0</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>1.8.0</version>
      <scope>provided</scope>
      <exclusions>
        <exclusion>
          <artifactId>scala-reflect</artifactId>
          <groupId>org.scala-lang</groupId>
        </exclusion>
        <exclusion>
          <artifactId>scala-compiler</artifactId>
          <groupId>org.scala-lang</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>1.8.0</version>
      <scope>provided</scope>
      <exclusions>
        <exclusion>
          <artifactId>scala-reflect</artifactId>
          <groupId>org.scala-lang</groupId>
        </exclusion>
        <exclusion>
          <artifactId>scala-compiler</artifactId>
          <groupId>org.scala-lang</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-shaded-jackson</artifactId>
      <version>2.7.9-6.0</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-cep_2.11</artifactId>
      <version>1.8.0</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>1.8.0</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils_2.11</artifactId>
      <version>1.8.0</version>
      <scope>test</scope>
      <exclusions>
        <exclusion>
          <artifactId>flink-test-utils-junit</artifactId>
          <groupId>org.apache.flink</groupId>
        </exclusion>
        <exclusion>
          <artifactId>curator-test</artifactId>
          <groupId>org.apache.curator</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-tests_2.11</artifactId>
      <version>1.8.0</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>1.8.0</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
      <version>1.8.0</version>
      <scope>test</scope>
      <exclusions>
        <exclusion>
          <artifactId>frocksdbjni</artifactId>
          <groupId>com.data-artisans</groupId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.8.0</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-runtime_2.11</artifactId>
      <version>1.8.0</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
代码示例
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.java.Tumble;

public class StreamTableDemo {
?????public static void main(String[] args) throws Exception {
?????????StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
???????? StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
???????? 
         env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
???????? 
???????? DataStreamSource<Tuple5<String, String, Integer, Integer, Long>>? dataStreamSource = env.addSource(new SourceFunction<Tuple5<String, String, Integer,Integer,Long>>() {
???????? ??     Random random = new Random();
???????? ??     int i=0;
????????????????@Override
????????????????public void run(SourceContext<Tuple5<String, String, Integer, Integer, Long>> ctx) throws Exception {
?????????????????????while (true) {
???????????????????????????i++;
???????????????????????????String name = "name-" + i;
???????????????????????????String sex = random.nextBoolean() == true ? "male" : "female";
???????????????????????????int age = random.nextInt(3) + 15;
???????????????????????????int score = random.nextInt(100) + 1;
???????????????????????????Long currentTime = System.currentTimeMillis();
???????????????????????????ctx.collect(new Tuple5(name, sex, age,score,currentTime));
???????????????????????????TimeUnit.SECONDS.sleep(1);
?????????????????????}
????????????????}
????????????????@Override
????????????????public void cancel() {???????????????????
????????????????}
???????? }, "source");
???????? tEnv.registerFunction("utc2local", new ScalarFunctionImpl());

???????? tEnv.registerDataStream("students", dataStreamSource, "name, sex, age, score, createTime, processingTime.proctime");

//?????? tEnv.registerTableSource("students", new StreamSourceTableTest());

//   Table table = tEnv.sqlQuery("select sex," +
                         "utc2local(TUMBLE_START(processingTime, INTERVAL '10' MINUTE)) as wStart," +
                         "utc2local(TUMBLE_END(processingTime, INTERVAL '10' MINUTE)) as wEnd," + 
                         "count(sex) FROM students " + 
                         "GROUP BY TUMBLE(processingTime, INTERVAL '10' MINUTE) , sex");


???????? Table table = tEnv.scan("students")
???????? ??????????????????.where("score > 90")
?????????????????????????? .window(Tumble.over("10.minutes").on("processingTme").as("w"))
????????????????    ?      .window(Over.partitionBy("school").orderBy("proctime").preceding("1.minutes").following("current_range").as("w"))
???????? ??????????????????.groupBy("w, sex")
???????? ??????????????????.select("sex, utc2local(w.start), utc2local(w.end), sex.count");
//?????? table.writeToSink(new CsvTableSink(sinkPath, ",", 1, WriteMode.NO_OVERWRITE));

???????? TypeInformation[] types = new TypeInformation[] {Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG};
???????? tEnv.toAppendStream(table, Types.ROW(types)).print();
???????? env.execute("table-stream-window-demo");
?????}

}
sql查询语言

//TODO 未完待续

调用table api

//TODO 未完待续

flink table sql两种写法

原文:https://www.cnblogs.com/guogai1949/p/12160540.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!