首页 > 其他 > 详细

【Flink】3、Flink构建流批数仓(二)

时间:2021-05-14 15:27:18      阅读:20      评论:0      收藏:0      [点我收藏+]

一、Hive Catalog

  主要作用是使用Hive MetaStore去管理Flink的元数据,如果不去持久化catalog,那么在每个session中取处理数据,都要去重复地创建元数据对象,这样是非常耗时的。

  HiveCatalog可以处理两种类型的表:一种是Hive兼容的表,另一种是普通表(generic table)。其中Hive兼容表是以兼容Hive的方式来存储的,所以,对于Hive兼容表而言,我们既可以使用Flink去操作该表,又可以使用Hive去操作该表。普通表是对Flink而言的,当使用HiveCatalog创建一张普通表,仅仅是使用Hive MetaStore将其元数据进行了持久化,所以可以通过Hive查看这些表的元数据信息(通过DESCRIBE FORMATTED命令),但是不能通过Hive去处理这些表,因为语法不兼容。对于是否是普通表,Flink使用is_generic属性进行标识。默认情况下,创建的表是普通表,即is_generic=true,如果要创建Hive兼容表,需要在建表属性中指定is_generic=false

  具体配置:

catalogs:
   - name: myhive
     type: hive
     default-database: default
     hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf

  创建兼容表:指定字段,不然就是默认表

CREATE TABLE hive_compatible_tbl ( 
    `user_id` BIGINT, -- 用户id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品类id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT -- 用户行为发生的时间戳
 ) WITH ( 
    ‘connector‘ = ‘kafka‘, -- 使用 kafka connector
    ‘topic‘ = ‘user_behavior‘, -- kafka主题
    ‘scan.startup.mode‘ = ‘earliest-offset‘, -- 偏移量
    ‘properties.group.id‘ = ‘group1‘, -- 消费者组
    ‘properties.bootstrap.servers‘ = ‘kms-2:9092,kms-3:9092,kms-4:9092‘, 
    ‘format‘ = ‘json‘, -- 数据源格式为json
    ‘json.fail-on-missing-field‘ = ‘true‘,
    ‘json.ignore-parse-errors‘ = ‘false‘,
    ‘is_generic‘ = ‘false‘
);

二、Hive Dialect

  只要开启了Hive dialect配置,用户就可以使用HiveQL语法,这样我们就可以在Flink中使用Hive的语法使用一些DDL和DML操作。默认的SQL方言是default,如果要使用Hive的语法,需要将SQL方言切换到hive。在SQL Cli中使用Hive dialect,sql-client-defaults.yaml配置文件中进行配置,一旦切换到了hive dialect,就只能使用Hive的语法建表,如果尝试使用Flink的语法建表,则会报错

execution:
  planner: blink
  type: batch
  result-mode: table

configuration:
  table.sql-dialect: hive

 

【Flink】3、Flink构建流批数仓(二)

原文:https://www.cnblogs.com/fi0108/p/14767782.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!