介绍
canal
是阿里巴巴旗下的一款开源项目,纯Java
开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL
(也支持mariaDB
)。
开始
MySQL
环境准备
使用MySQL5.1.x
以上即可
创建canal用户
创建一个canal用户,用来监控:
1 2 3 4 5
| -- 使用命令登录:mysql -u root -p -- 创建用户 用户名:canal 密码:canal create user 'canal'@'%' identified by 'canal'; -- 授权 *.*表示所有库 grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%';
|
配置my.ini
在MySQL
配置文件my.ini
,添加以下内容:
1 2 3 4 5 6 7
| [mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
|
重启MySQL
并查看配置
查看是否打开binlog
模式
1 2
| -- 查看是否打开binlog模式 show VARIABLES like '%log_bin%'
|
安装canal
资源下载
下载地址:https://github.com/alibaba/canal/releases
解压资源
解压下载好的压缩包到任意目录
配置instance.properties
文件
打开conf/example/instance.properties
文件
其中:
canal.instance.naster.address
填写需要监听的数据库的服务器地址
canal.instance.dbUsername
填写需要监听的数据库用户名
canal.instance.dbPassword
填写需要监听的数据库密码
canal.instance.defaultDatabaseName
填写需要监听的数据库名
canal.instance.filter.regex
填写需要监听的表名,格式为正则表达式。常用的表达式如下:
- 所有表:
.*
或 .*\..*
- canal schema下所有表:
canal\..*
- canal下的以canal打头的表:
canal\.canal.*
- canal schema下的一张表:
canal.test1
- 多个规则组合使用:
canal\..*,mysql.test1,mysql.test2
(逗号分隔)
在Windows
环境下,在bin
目录下找到startup.bat
启动
然后打开logs/canal/canal.log
文件,看到内容没有异常信息,说明成功
SpringBoot
整合canal
引入依赖
1 2 3 4 5
| <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>
|
新建CanalClient.java
用来监听MySQL
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| package com.shaangu.dist.common.flow.listener;
import com.alibaba.otter.canal.protocol.CanalEntry; import com.shaangu.dist.common.flow.service.FlowMessageService; import com.shaangu.dist.common.flow.service.FlowTaskCommentService; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import java.util.List;
@Component public class CanalClient implements InitializingBean {
@Autowired private FlowMessageService flowMessageService; @Autowired private FlowTaskCommentService flowTaskCommentService;
public static CanalClient canalClient; @PostConstruct public void afterPropertiesSet() { canalClient = this; canalClient.flowMessageService = this.flowMessageService; canalClient.flowTaskCommentService = this.flowTaskCommentService; }
public static void printEntry(List<CanalEntry.Entry> entrys) throws InterruptedException { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChange; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChange.getEventType();
if (rowChange.getIsDdl()) { System.out.println("是DDL语句" + rowChange.getSql()); }
if (eventType == CanalEntry.EventType.INSERT) { System.out.println("--------------INSERT---------------"); System.out.println("库名:" + entry.getHeader().getSchemaName() + "-- 表名:" + entry.getHeader().getTableName() + "新增数据"); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { printColumn(rowData.getAfterColumnsList(), eventType); Thread.sleep(500); } } else if (eventType == CanalEntry.EventType.DELETE) { System.out.println("--------------DELETE---------------"); System.out.println("库名:" + entry.getHeader().getSchemaName() + "-- 表名:" + entry.getHeader().getTableName() + "删除数据"); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { printColumn(rowData.getBeforeColumnsList(), eventType); } } else if (eventType == CanalEntry.EventType.UPDATE) { System.out.println("--------------UPDATE---------------"); System.out.println("库名:" + entry.getHeader().getSchemaName() + "-- 表名:" + entry.getHeader().getTableName() + "修改数据"); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { printColumn(rowData.getBeforeColumnsList(), eventType); printColumn(rowData.getAfterColumnsList(), eventType); } } } }
private static void printColumn(List<CanalEntry.Column> columns, CanalEntry.EventType eventType) { if (eventType == CanalEntry.EventType.INSERT) { String title = null; for (CanalEntry.Column column : columns) {
if (column.getName().equals("PROC_INST_ID_")) { title = canalClient.flowTaskCommentService.getTitleById(Long.valueOf(canalClient.flowMessageService.getBusinessKeyByPID(column.getValue()))); } if (column.getName().equals("ASSIGNEE_")) { if (column.getValue().equals("admin")) return; canalClient.flowMessageService.sendRabbitMessage(column.getValue(), title); } } } else { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } } }
|
然后在WebAdminApplication.java
中,添加以下代码并调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public static void messageMonitor() { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("host", 11111), "example", "root", "password"); try { connector.connect(); connector.subscribe("regex"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(1000); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } else { CanalClient.printEntry(message.getEntries()); List<CanalEntry.Entry> entries = message.getEntries(); } connector.ack(batchId); } } catch (Exception e) { e.printStackTrace(); } finally { connector.disconnect(); } }
|
其中,需要注意修改的参数:
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("host", 11111), "example", "root", "password");
中:
host
是你canal所在的服务器地址,11111和example和下面配置保持一致:
root
和password
是需要监听的数据库的用户名和密码
regex
是需要监听的规则,为正则表达式
效果
至此,配置成功。监听成功的效果如下: