Canal 数据监控的使用

    技术2025-08-14  12

    Canal 数据监控的使用

    1. MySQL 设置1.1 开启 binlog1.2 使用 root 账号创建用户并授予权限 2. Docker 安装 canal-server3. Java 实现 Canal Client

    环境: Docker: 19 版本 Java: 11 版本 MySQL: 8 版本 Canal: 1.1.+ 版本

    1. MySQL 设置

    1.1 开启 binlog

    修改 my.cnf 文件 正常在 /etc/mysql/my.cnf 添加下面的内容.

    # binlog setting log-bin=/root/binlog/mysql-bin server-id=13306

    注: 如果使用了共享卷需要进行授权

    chmod -R 777 <mysql-bin 文件所在的主机路径>

    重启容器

    查看 binlog 的状态

    SHOW VARIABLES LIKE 'log_bin%'

    binlog 有三种格式, 默认是 ROW

    SHOW VARIABLES LIKE 'binlog_format'

    1.2 使用 root 账号创建用户并授予权限

    create user canal@'%' IDENTIFIED by 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT, SUPER ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;

    2. Docker 安装 canal-server

    运行容器并进入

    docker pull canal/canal-server docker run -d -it -p 11111:11111 --name=canal canal/canal-server docker exec -it canal /bin/bash

    修改文件

    vi canal-server/conf/canal.properties

    vi canal-server/conf/example/instance.properties

    .* 代表所有的数据库或者表 .*\\..* 中两个 .* 中间夹了一个 \\. 而 \\. 是用于转义 . 的。 第一个 .* 代表所有的数据库 第二个 .* 代表所有的表 .*\\..* ==> 所有的数据库.所有的表

    # 重启 docker restart canal

    3. Java 实现 Canal Client

    需要建立一个 SpringBoot 形成将其形成 Jar. 方便使用. https://www.bilibili.com/video/BV1GE411G7Hg?p=93 这个视频下面的评论中的连接就有

    导包

    <dependency> <groupId>com.ykenan.ykenan</groupId> <artifactId>commerce_common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <!-- canal.client --> <!--<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>--> <!-- 项目 canal.client jar 包 --> <dependency> <groupId>com.ykenan.ykenan</groupId> <artifactId>commerce_canal</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>

    application.yml 配置文件 文件中的 example 就是上边设置 canal.destinations 保持一致, 默认就是 example.

    # canal client 配置 canal: client: instances: example: host: 192.168.19.129 port: 11111 userName: canal password: canal

    启动类 加上 @EnableCanalClient 注解

    监听类

    package com.commerce.canal.listener; import com.alibaba.otter.canal.protocol.CanalEntry; import com.ykenan.ykenan.annotation.CanalEventListener; import com.ykenan.ykenan.annotation.DeleteListenPoint; import com.ykenan.ykenan.annotation.InsertListenPoint; import com.ykenan.ykenan.annotation.UpdateListenPoint; import java.util.List; /** * 实现 MySQL 数据监听 */ @CanalEventListener public class CanalDataEventListener { /** * 增加监听 * * @param eventType 当前操作的类型 (例如:增加数据) * @param rowData 发生变更的一行数据 */ @InsertListenPoint public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); System.out.println("增加后的数据:"); for (CanalEntry.Column column : afterColumnsList) { // 列名 String name = column.getName(); // 变更后的数据 String value = column.getValue(); System.out.println(name + ":" + value); } } /** * 修改监听 * * @param eventType 当前操作的类型 (例如:增加数据) * @param rowData 发生变更的一行数据 */ @UpdateListenPoint public void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); System.out.println("修改前的数据:"); for (CanalEntry.Column column : beforeColumnsList) { System.out.println(column.getName() + ":" + column.getValue()); } System.out.println("修改后的数据:"); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { System.out.println(column.getName() + ":" + column.getValue()); } } /** * 删除监听 * * @param eventType 当前操作的类型 (例如:增加数据) * @param rowData 发生变更的一行数据 */ @DeleteListenPoint public void onEventDelete(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); System.out.println("删除前的数据:"); for (CanalEntry.Column column : beforeColumnsList) { System.out.println(column.getName() + ":" + column.getValue()); } } }

    效果

    监听类中将上面的合在一个自定义中

    /** * 自定义监听 * * @param eventType 当前操作的类型 (例如:增加数据) * @param rowData 发生变更的一行数据 */ @ListenPoint( // 指定监听的类型 eventType = {CanalEntry.EventType.DELETE, CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT,}, // 指定监听的数据库 schema = {"commerce_ad"}, // 指定监听的表 table = {"tb_ad"}, // 指定实例的地址 destination = "example" ) public void onEvent(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); System.out.println("自定义操作前的数据:"); for (CanalEntry.Column column : beforeColumnsList) { System.out.println(column.getName() + ":" + column.getValue()); } System.out.println("自定义操作操作后的数据:"); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { System.out.println(column.getName() + ":" + column.getValue()); } }

    效果是一样的

    Processed: 0.011, SQL: 9