环境: Docker: 19 版本 Java: 11 版本 MySQL: 8 版本 Canal: 1.1.+ 版本
修改 my.cnf 文件 正常在 /etc/mysql/my.cnf 添加下面的内容.
# binlog setting log-bin=/root/binlog/mysql-bin server-id=13306注: 如果使用了共享卷需要进行授权
chmod -R 777 <mysql-bin 文件所在的主机路径>重启容器
查看 binlog 的状态
SHOW VARIABLES LIKE 'log_bin%'binlog 有三种格式, 默认是 ROW
SHOW VARIABLES LIKE 'binlog_format'运行容器并进入
docker pull canal/canal-server docker run -d -it -p 11111:11111 --name=canal canal/canal-server docker exec -it canal /bin/bash修改文件
vi canal-server/conf/canal.properties vi canal-server/conf/example/instance.properties.* 代表所有的数据库或者表 .*\\..* 中两个 .* 中间夹了一个 \\. 而 \\. 是用于转义 . 的。 第一个 .* 代表所有的数据库 第二个 .* 代表所有的表 .*\\..* ==> 所有的数据库.所有的表
# 重启 docker restart canal需要建立一个 SpringBoot 形成将其形成 Jar. 方便使用. https://www.bilibili.com/video/BV1GE411G7Hg?p=93 这个视频下面的评论中的连接就有
导包
<dependency> <groupId>com.ykenan.ykenan</groupId> <artifactId>commerce_common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <!-- canal.client --> <!--<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>--> <!-- 项目 canal.client jar 包 --> <dependency> <groupId>com.ykenan.ykenan</groupId> <artifactId>commerce_canal</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>application.yml 配置文件 文件中的 example 就是上边设置 canal.destinations 保持一致, 默认就是 example.
# canal client 配置 canal: client: instances: example: host: 192.168.19.129 port: 11111 userName: canal password: canal启动类 加上 @EnableCanalClient 注解
监听类
package com.commerce.canal.listener; import com.alibaba.otter.canal.protocol.CanalEntry; import com.ykenan.ykenan.annotation.CanalEventListener; import com.ykenan.ykenan.annotation.DeleteListenPoint; import com.ykenan.ykenan.annotation.InsertListenPoint; import com.ykenan.ykenan.annotation.UpdateListenPoint; import java.util.List; /** * 实现 MySQL 数据监听 */ @CanalEventListener public class CanalDataEventListener { /** * 增加监听 * * @param eventType 当前操作的类型 (例如:增加数据) * @param rowData 发生变更的一行数据 */ @InsertListenPoint public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); System.out.println("增加后的数据:"); for (CanalEntry.Column column : afterColumnsList) { // 列名 String name = column.getName(); // 变更后的数据 String value = column.getValue(); System.out.println(name + ":" + value); } } /** * 修改监听 * * @param eventType 当前操作的类型 (例如:增加数据) * @param rowData 发生变更的一行数据 */ @UpdateListenPoint public void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); System.out.println("修改前的数据:"); for (CanalEntry.Column column : beforeColumnsList) { System.out.println(column.getName() + ":" + column.getValue()); } System.out.println("修改后的数据:"); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { System.out.println(column.getName() + ":" + column.getValue()); } } /** * 删除监听 * * @param eventType 当前操作的类型 (例如:增加数据) * @param rowData 发生变更的一行数据 */ @DeleteListenPoint public void onEventDelete(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); System.out.println("删除前的数据:"); for (CanalEntry.Column column : beforeColumnsList) { System.out.println(column.getName() + ":" + column.getValue()); } } }效果
监听类中将上面的合在一个自定义中
/** * 自定义监听 * * @param eventType 当前操作的类型 (例如:增加数据) * @param rowData 发生变更的一行数据 */ @ListenPoint( // 指定监听的类型 eventType = {CanalEntry.EventType.DELETE, CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT,}, // 指定监听的数据库 schema = {"commerce_ad"}, // 指定监听的表 table = {"tb_ad"}, // 指定实例的地址 destination = "example" ) public void onEvent(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); System.out.println("自定义操作前的数据:"); for (CanalEntry.Column column : beforeColumnsList) { System.out.println(column.getName() + ":" + column.getValue()); } System.out.println("自定义操作操作后的数据:"); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { System.out.println(column.getName() + ":" + column.getValue()); } }效果是一样的