首页 > 数据库技术 > 详细

mysql执行拉链表操作

时间:2020-06-01 22:59:23      阅读:103      评论:0      收藏:0      [点我收藏+]

拉链表需求:

  1.数据量比较大

  2.变化的比例和频率比较小,例如客户的住址信息,联系方式等,比如有1千万的用户数据,每天全量存储会存储很多不变的信息,对存储也是浪费,因此可以使用拉链表的算法来节省存储空间

       3.拉链历史表,既能反映每个客户不同时间的不同状态,也可查看某个时间点的全量快照信息

拉链表设计

  技术分享图片

设计的拉链历史表:

技术分享图片

反映A客户的状态信息

select * from ods_account where cst_id=A‘;

技术分享图片

 

 

反映20190601历史数据:

 select * from ods_account where eff_date<=20190601and end_date>20190601‘;

技术分享图片

 

反映20190602历史全量数据:

 select * from ods_account where eff_date<=20190602and end_date>20190602‘;

技术分享图片

 

 建表:

技术分享图片
use edw;

drop table if exists src_account;
create table if not exists src_account(
cst_id varchar(64) comment 客户唯一编号,
bal    float       comment 余额,
date_id varchar(16) comment 日期
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
alter table src_account add primary key(cst_id,date_id);

drop table if exists delta_account;
create table if not exists delta_account(
cst_id varchar(64) comment 客户唯一编号,
bal    float       comment 余额,
etl_flag varchar(16) comment ETL标记
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
alter table delta_account add primary key(cst_id,etl_flag);

drop table if exists odshis_account;
create table if not exists odshis_account(
cst_id varchar(64) comment 客户唯一编号,
bal    float       comment 余额,
eff_date varchar(16) comment 生效日期,
end_date varchar(16) comment 失效日期,
job_seq_id varchar(16) comment 批次号,
new_job_seq_id varchar(16) comment 最新批次号
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
alter table odshis_account add primary key(cst_id,new_job_seq_id);

drop table if exists ods_account;
create table if not exists ods_account(
cst_id varchar(64) comment 客户唯一编号,
bal    float       comment 余额,
eff_date varchar(16) comment 生效日期,
end_date varchar(16) comment 失效日期,
job_seq_id varchar(16) comment 批次号
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
alter table ods_account add primary key(cst_id,eff_date,end_date);
技术分享图片

 

加载原始数据:

  

技术分享图片
delete from src_account;
insert into src_account values(A‘,20‘,20190601);
insert into src_account values(B‘,30‘,20190601);
insert into src_account values(C‘,50‘,20190601);
insert into src_account values(D‘,35‘,20190601);

insert into src_account values(A‘,20‘,20190602);
insert into src_account values(B‘,50‘,20190602);
insert into src_account values(D‘,20‘,20190602);
insert into src_account values(E‘,50‘,20190602);


insert into src_account values(A‘,30‘,20190603);
insert into src_account values(B‘,50‘,20190603);
insert into src_account values(C‘,50‘,20190603);
insert into src_account values(D‘,20‘,20190603);


insert into src_account values(A‘,30‘,20190604);
insert into src_account values(B‘,40‘,20190604);
insert into src_account values(C‘,30‘,20190604);
insert into src_account values(D‘,20‘,20190604);
insert into src_account values(E‘,20‘,20190604);
insert into src_account values(F‘,20‘,20190604);
insert into src_account values(G‘,20‘,20190604‘);
技术分享图片

 

开始拉链过程:

 

技术分享图片
 #清空增量数据
   truncate delta_account;
   #加载增量数据(新增)
   insert into delta_account
select t1.cst_id,t1.bal,Ias etl_flag from 
(select * from src_account where date_id = ${job_date_id}) t1 
left join 
(select * from src_account where date_id = ${before_job_date_id}) t2
on t1.cst_id = t2.cst_id where t2.cst_id is null;

#加载增量数据(删除)
insert into delta_account
select t1.cst_id,t1.bal,Das etl_flag from 
(select * from src_account where date_id = ${before_job_date_id}) t1 
left join 
(select * from src_account where date_id = ${job_date_id}) t2
on t1.cst_id = t2.cst_id where t2.cst_id is null;

#加载增量数据(变更前)
insert into delta_account
select t1.cst_id,t1.bal,Aas etl_flag from 
(select * from src_account where date_id = ${job_date_id}) t1 
left join
(select * from src_account where date_id = ${before_job_date_id}) t2
on t1.cst_id = t2.cst_id where t2.cst_id is not null
and t1.bal <> t2.bal;

#加载增量数据(变更后)
insert into delta_account
select t1.cst_id,t2.bal,Bas etl_flag from 
(select * from src_account where date_id = ${job_date_id}) t1 
left join
(select * from src_account where date_id = ${before_job_date_id}) t2
on t1.cst_id = t2.cst_id where t2.cst_id is not null
and t1.bal <> t2.bal;

#1.重跑:删除已跑入数据
delete from ods_account where job_seq_id = ${job_date_id};

#2.重跑:从历史表恢复数据
insert into ods_account(cst_id,bal,eff_date,end_date,job_seq_id)
select  cst_id,bal,eff_date,end_date,job_seq_id from odshis_account
where new_job_seq_id = ${job_date_id};

#3.重跑:删除已跑入历史数据
delete from odshis_account where new_job_seq_id = ${job_date_id};

#4.备份数据到历史表
insert into odshis_account(cst_id,bal,eff_date,end_date,job_seq_id,new_job_seq_id)
select cst_id,bal,eff_date,end_date,job_seq_id,${job_date_id}from ods_account t
where t.end_date=99991231and exists ( select 1 from delta_account s
where t.cst_id=s.cst_id );

#5.断链
update ods_account t set end_date=${job_date_id}‘,job_seq_id = ${job_date_id}where t.end_date=99991231and exists ( select 1 from delta_account s where etl_flag in (I‘,D‘,A‘) and t.cst_id=s.cst_id );

#6.加链
insert into ods_account(cst_id,bal,eff_date,end_date,job_seq_id)
select cst_id,bal,${job_date_id}as eff_date,99991231as end_date,${job_date_id}as job_seq_id from delta_account where etl_flag in (A‘,I);

#7.保持数据完整性
insert into ods_account (cst_id,bal,eff_date,end_date,job_seq_id)
select t.cst_id,t.bal,${job_date_id}‘,${job_date_id}as end_date,${job_date_id}as job_seq_id from delta_account t  where etl_flag = Dand not exists (select 1 from ods_account s
where t.cst_id=s.cst_id)
技术分享图片
 
COME FROM:https://www.cnblogs.com/Jims2016/p/11045969.html

mysql执行拉链表操作

原文:https://www.cnblogs.com/seasonzone/p/13027531.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!