?最近太多事情 工作的事情,以及终身大事等等 耽误更新,由于最近做项目需要同步监听 未来电视 mysql的变更了解到公司会用canal做增量监听,就尝试使用了一下 这里做个demo 简单的记录一下。
?canal:主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费的中间件
?当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
??1.MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
??2.MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
??3.MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
??1.canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
??2.MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
??3.canal 解析 binary log 对象(原始为 byte 流)
?对于自建MySQL ,需要先开启 Binlog写入功能,并且配置binlog-format 为Row模式 在my.cnf中配置
?授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
??canal 下载地址 (下载速度可能很慢)
??下载 canal.deployer-xxx.tar.gz 如 canal.deployer-1.1.4.tar.gz
??解压后 可以看到如下结构
??配置修改:
vim conf/example/instance.properties
??如下:
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 2020
# position info 修改自己的数据库(canal要监听的数据库 地址 )
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password 修改成自己 数据库信息的账号 (单独开一个 准备阶段创建的账号)
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
# table regex 表的监听规则
# canal.instance.filter.regex = blogs\.blog_info
canal.instance.filter.regex = .\*\\\\..\*
# table black regex
canal.instance.filter.black.regex =
??启动canal
sh bin/startup.sh
??查看server日志
??看到 the canal server is running now 表示启动成功
vi logs/canal/canal.log
2020-01-08 15:25:33.361 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2020-01-08 15:25:33.468 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111]
2020-01-08 15:25:34.061 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
??查看instance的日志
vi logs/example/example.log
2020-01-08 15:25:33.864 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-blogs
2020-01-08 15:25:33.998 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
2020-01-08 15:25:33.999 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status
vi conf/canal.properties
??在canal.destinations 处可以配置当前server上部署的instance 列表 默认为 example ,我这里改成了 blogs最好对应数据库名称。一个instance 对应一个 数据库
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
??CanalMessageListener.java
??该类实现InitializingBean 主要是在初始化的时候 执行 init 方法,在init()方法中 创建 CanalConnector对象,连接需要监听的canal,主要提供 canal的 host ,port ,destination ,以及username 和 password
??parse 方法 主要用于将监听的对象 通过反射等转换成对应的实体类
/**
* @author johnny
**/
@Component
@Slf4j
@ConditionalOnProperty(name = "application.canal.accessor", havingValue = "canal")
public class CanalMessageListener implements InitializingBean, ParseCanal {
private CanalConnector connector;
@Autowired
private CanalConfig canalConfig;
@Autowired
private IParseDispatcher configParseDispatcher;
private void init() {
//创建canal 监听 传入host port destination等参数
connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getHost(), canalConfig.getPort()),
canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
connector.connect();
// .*\..*
connector.subscribe(".*\\..*");
connector.rollback();
new Thread(() -> {
while (true) {
Message message = connector.getWithoutAck(canalConfig.getBatchSize());
long batchId = message.getId();
long size = message.getEntries().size();
//batchId == -1 表示没有数据变更
if (batchId == -1 || size == 0) {
System.out.println("empty data ");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
//解析数据变更
resoleveEntry(message.getEntries());
}
}
}).start();
}
//解析数据变更
private void resoleveEntry(List<CanalEntry.Entry> entries) {
CanalEntry.RowChange rowChange = null;
for (CanalEntry.Entry row : entries) {
//判断是否是 事物开始 和 事物结束
if (row.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || row.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
try {
rowChange = CanalEntry.RowChange.parseFrom(row.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
String tableName = row.getHeader().getTableName();
CanalEntry.EventType eventType = row.getHeader().getEventType();
for (CanalEntry.RowData rowData : rowDataList) {
if (eventType == CanalEntry.EventType.UPDATE) {
List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
Object object = parse(columns, tableName);
log.info("收到的 object:{}", JsonUtils.marshalToString(object));
//根据收到的对象 处理后续业务逻辑
}
}
}
}
@Override
public void afterPropertiesSet() throws Exception {
init();
}
//解析 List<CanalEntry.Column>对象到对应的 实体类
@Override
public Object parse(List<CanalEntry.Column> canalDatas, String tableName) {
//根据配置好的map 从中根据key 表名 获取对应的映射后的 实体类class
String className = configParseDispatcher.dispatch(tableName);
Object entity = null;
Class c = null;
try {
c = Class.forName(className);
entity = c.newInstance();
} catch (ClassNotFoundException e) {
log.error("【未找到对应 {} 的 实体类 】", className);
} catch (Exception e) {
}
for (CanalEntry.Column canalDataColumn : canalDatas) {
String columnName = canalDataColumn.getName();
Field[] fields = c.getDeclaredFields();
for (Field field : fields) {
Object fieldValue = null;
field.setAccessible(true);
String fiedName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field.getName());
log.info("【filedName: {}】", fiedName);
if (fiedName.equals(columnName)) {
try {
if (Long.class.equals(field.getType())) {
fieldValue = NumberUtils.toLong(canalDataColumn.getValue());
}else if(Integer.class.equals(field.getType())){
fieldValue = NumberUtils.toInt(canalDataColumn.getValue());
}else if(Double.class.equals(field.getType())){
fieldValue = NumberUtils.toDouble(canalDataColumn.getValue());
}else if(Date.class.equals(field.getType())){
try {
fieldValue = DateUtils.parseDate(canalDataColumn.getValue(), new String[]{"yyyy-MM-dd HH:mm:ss"});
} catch (ParseException e) {
e.printStackTrace();
}
}else{
fieldValue = canalDataColumn.getValue();
}
field.set(entity, fieldValue);
break;
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
}
return entity;
}
}
??application.yml
??配置canal 地址,以及表名和实体的映射规则
server:
port: 8881
application:
canal:
accessor: canal
host: 127.0.0.1
port: 11111
username:
password:
destination: blogs
batchSize: 30
parse: 规则,根据表名获取对应要映射的 实体class
rule:
mapping:
blog_info: com.johnny.canal.canal_test.entity.BlogInfo
??IParseDispatcher.java
??接口:用来根据表名key获取对应的 要映射的实体,这里写成接口是因为可以提供多种获取方式,比如我这里通过yml 配置去获取
/**
* @author johnny
* @create 2020-01-17 上午11:09
**/
public interface IParseDispatcher {
String dispatch(String key);
}
??ConfigParseDispatcher.java
??实现上面的接口,提供一种从 application.yml 获取初始源配置 根据 application.canal.parse.rule进行配置
/**
* @author johnny
* @create 2020-01-17 上午11:07
**/
@Data
@Configuration
@ConfigurationProperties(prefix = "application.canal.parse.rule")
public class ConfigParseDispatcher implements IParseDispatcher {
private Map<String,String> mapping=new HashMap<>();
@Override
public String dispatch(String key) {
return mapping.get(key);
}
}
??启动项目 此时控制台打印 empty data ,无数据变更
??通过执行 在 canal监听的mysql 上执行 更新语句
update blog_info set blog_title = 'SpringBoot配置相关for canal test ' where id = 40
??debug 程序,当执行上面的update语句后 可以看到立即收到
??通过parse方法解析为对应的 实体对象,后续做自己的业务逻辑 即可
?本篇主要介绍了canal是什么,如何下载安装和配置 ,以及提供了自己写的一个简单demo 。后续有机会深入了解一下canal的其他功能,比如 如何同步到Kafka/RocketMQ等等。。
个人博客地址: https://www.askajohnny.com 欢迎访问!
本文由博客一文多发平台 OpenWrite 发布!
原文:https://www.cnblogs.com/askajohnny/p/12206738.html