dbsync项目目标是围绕PostgreSQL、Greenplum 实现易用的数据的互迁功能
github地址:https://github.com/aliyun/rds_dbsync
一.? 支持的功能
1. MySQL -> PostgreSQL/Greenplum(binlog_minner binlog_loader)
功能:基于 MySQL binlog 解析的增量数据同步
状态:已开放二进制 文档
2. MySQL -> PostgreSQL/Greenplum mysql2pgsql
功能:以表为单位的多线程全量数据迁移
状态:已开源 文档
3. PostgreSQL -> PostgreSQL pgsql2pgsql
功能 pg->pg 全量+增量数据同步
状态:已开源 文档
4. PostgreSQL -> PostgreSQL/Greenplum pgsql2gp
功能:基于 PostgreSQL 逻辑日志的增量数据同步
状态:未开发完成
二.? 原理介绍
官方文档说明:https://github.com/aliyun/rds_dbsync/blob/master/doc/mysql2gp.md
1.在客户端主机(也可以部署在其他主机)上启动一个临时 PG 数据库,用于临时存放从 MySQL 拉去到的 binlog 数据
2.binlog_miner 从源 MySQL 的一个 binlog 文件开始,拉取和解析 binlog 并存放到临时 PG 中
3.binlog_loader 从临时 PG 中读取增量数据,并做适当的处理,最终批量写入到目标 PostgreSQL 或 Greenplum 中去
?
三.? 配置案例
1. 配置cfg文件-my.cfg.env
[src.mysql]
host = "{USER_HOST}"
port = "3306"
user = "{MYSQL_USERNAME}"
password = "{MYSQL_PASSWORD}"
db = "user"
encodingdir = "share"
encoding = "utf8mb4"
binlogfile = ""
binlogfile_offset = "4"
[src.pgsql]
connect_string = "host={SRC_HOST} dbname={SRC_DATABASE} port={SRC_PORT} user={SRC_USERNAME} password={SRC_PASSWORD}"
[local.pgsql]
connect_string = "host={LOCAL_HOST} dbname={LOCAL_NAME} port={LOCAL_PORT} user={LOCAL_USERNAME} password={LOCAL_PASSWORD}"
[desc.pgsql]
connect_string = "host={DESC_HOST} dbname={DESC_NAME} port={DESC_PORT} user={DESC_USERNAME} password={DESC_PASSWORD}"
target_schema="ods_hjm"
middle_schema="ods_hjm_temp"
ignore_copy_error_count_each_table = "0"
[binlogloader]
loader_table_list = ""
load_batch = 10
load_batch_gap = 10
2. 配置txt文件-table.txt
h_app_device:select id,user_id,kid_id,imei,oaid,idfa,os,version,channel,push_regid,push_user_account,push_apnsid,created_at,updated_at,app_type,push_status from h_app_device where updated_at >= DATE_SUB(curdate(), INTERVAL 1 DAY) and updated_at < CURRENT_DATE
h_app_device_log_202103:select id,user_id,device,os,version,imei,channel,remote_ip,app_type,created_at,updated_at from h_app_device_log_202103 where updated_at >= DATE_SUB(curdate(), INTERVAL 1 DAY) and created_at < CURRENT_DATE
h_app_device_log:select id,user_id,device,os,version,imei,created_at,updated_at,app_type from h_app_device_log where updated_at >= DATE_SUB(curdate(), INTERVAL 1 DAY) and created_at < CURRENT_DATE
h_user_kid:select id,user_id,name,birth,gender,source,status,avatar,created_at,updated_at from h_user_kid where updated_at >= DATE_SUB(curdate(), INTERVAL 1 DAY) and created_at < CURRENT_DATE
3. 配置sql文件-table.sql
truncate table dwd.device_log_day;
insert into dwd.device_log_day select * from ods_hjm_temp.h_app_device_log_202103 where created_at >= CURRENT_DATE - INTERVAL ‘1 day‘
AND created_at < CURRENT_DATE;
truncate table dwd.device_log_day;
insert into dwd.device_log_day select * from ods_hjm_temp.h_app_device_log_202103 where created_at >= CURRENT_DATE - INTERVAL ‘1 day‘
AND created_at < CURRENT_DATE;
-- 将device_log写入分区表,为了防止重复抽取,删除昨日的,再写入今日的
delete from dwd.device_log_all where updated_at >= CURRENT_DATE - INTERVAL ‘1 day‘;
insert into dwd.device_log_all select * from dwd.device_log_day;
-- 将新表写入数据,格式化天和时间维度
delete from dwd.device_log_new where created_at >= CURRENT_DATE - INTERVAL ‘1 day‘;
insert into dwd.device_log_new(id, user_id, device, os, version, imei, channel, remote_ip, app_type, new_date, new_time, created_at)
select id, user_id, device, os, version, imei, channel, remote_ip, app_type, to_char(created_at, ‘YYYYMMDD‘) as new_date, to_char(created_at, ‘HH24MISS‘) as new_time, created_at from dwd.device_log_day;
-- 写入轻量级表
with temp_device as (
select date_id, user_id, device, os, version, imei, app_type, channel, created_at
from (
select pg_catalog.date(created_at) as date_id, user_id, device, os, version, imei, app_type, channel, created_at, row_number() over(partition by user_id, imei order by created_at) as rn
from dwd.device_log_day
) as t
where rn = 1
), temp_device_new as (
select
a.date_id, a.user_id, a.device, a.os, a.version, a.imei, a.app_type, a.channel, row_number() OVER (PARTITION BY a.date_id, a.user_id ORDER BY a.created_at) as user_rn
from temp_device as a
left join ods.ods_device_bind as b on a.user_id = b.user_id and a.imei = b.imei
where b.date_id is null
)
insert into ods.ods_device_bind
select date_id, user_id, device, os, version, imei, app_type, channel, user_rn from temp_device_new;
-- 写入用户设备日期唯一表
delete from dwd.dwd_device_log where date_id >= CURRENT_DATE - INTERVAL ‘1 day‘;
with temp_device as (
select date_id, user_id, imei, os, version, app_type, created_at
from (
select pg_catalog.date(created_at) as date_id, user_id, device, os, version, imei, app_type, channel, created_at, row_number() over(partition by date_id, imei, user_id order by created_at) as rn
from dwd.device_log_day
) as t
where rn = 1
), temp_device_new as (
select
date_id, user_id, a.os, a.version, imei,
row_number() OVER (PARTITION BY date_id, user_id ORDER BY created_at) as user_rn,
row_number() OVER (PARTITION BY date_id, imei ORDER BY created_at) as imei_rn,
app_type
from temp_device as a
)
insert into dwd.dwd_device_log
select date_id, user_id, imei, os, version, user_rn, imei_rn, app_type from temp_device_new;
? ?
四.? 启动同步进程
1.? ?启动 binlog 拉取进程
cd bin
./binlog_miner
2. ?启动 binlog 写入进程
cd bin
./binlog_loader
?
?
?
?
原文:https://blog.51cto.com/u_13285386/3202254