springboot整合canal实现对MySQL数据库监控

介绍

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

开始

MySQL环境准备

使用MySQL5.1.x以上即可

创建canal用户

创建一个canal用户,用来监控:

1
2
3
4
5
-- 使用命令登录:mysql -u root -p
-- 创建用户 用户名:canal 密码:canal
create user 'canal'@'%' identified by 'canal';
-- 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%';
配置my.ini

MySQL配置文件my.ini,添加以下内容:

1
2
3
4
5
6
7
[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1
重启MySQL并查看配置

查看是否打开binlog模式

1
2
-- 查看是否打开binlog模式
show VARIABLES like '%log_bin%'

binlog模式

安装canal

资源下载

下载地址:https://github.com/alibaba/canal/releases

下载

解压资源

解压下载好的压缩包到任意目录

解压

配置instance.properties文件

打开conf/example/instance.properties文件

配置

其中:

canal.instance.naster.address填写需要监听的数据库的服务器地址

canal.instance.dbUsername填写需要监听的数据库用户名

canal.instance.dbPassword填写需要监听的数据库密码

canal.instance.defaultDatabaseName填写需要监听的数据库名

canal.instance.filter.regex填写需要监听的表名,格式为正则表达式。常用的表达式如下:

  1. 所有表:.*.*\..*
  2. canal schema下所有表:canal\..*
  3. canal下的以canal打头的表:canal\.canal.*
  4. canal schema下的一张表:canal.test1
  5. 多个规则组合使用:canal\..*,mysql.test1,mysql.test2(逗号分隔)

Windows环境下,在bin目录下找到startup.bat启动

启动界面

然后打开logs/canal/canal.log文件,看到内容没有异常信息,说明成功

日志信息

SpringBoot整合canal

引入依赖
1
2
3
4
5
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>
新建CanalClient.java

用来监听MySQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package com.shaangu.dist.common.flow.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.shaangu.dist.common.flow.service.FlowMessageService;
import com.shaangu.dist.common.flow.service.FlowTaskCommentService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

@Component
public class CanalClient implements InitializingBean {

@Autowired
private FlowMessageService flowMessageService;
@Autowired
private FlowTaskCommentService flowTaskCommentService;

public static CanalClient canalClient;

@PostConstruct
public void afterPropertiesSet() {
canalClient = this;
canalClient.flowMessageService = this.flowMessageService;
canalClient.flowTaskCommentService = this.flowTaskCommentService;
}

/**
* 打印canal server解析binlog获得的实体类信息
*/
public static void printEntry(List<CanalEntry.Entry> entrys) throws InterruptedException {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
//开启/关闭事务的实体类型,跳过
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
//获取操作类型:insert/update/delete类型
CanalEntry.EventType eventType = rowChange.getEventType();

//判断是否是DDL语句
if (rowChange.getIsDdl()) {
System.out.println("是DDL语句" + rowChange.getSql());
}

// 根据不同的语句类型,处理不同的业务
if (eventType == CanalEntry.EventType.INSERT) {
// 是新增语句,业务处理。如果新增的时候数据没有发生变化的情况下,是不会被执行
System.out.println("--------------INSERT---------------");
System.out.println("库名:" + entry.getHeader().getSchemaName() + "-- 表名:" + entry.getHeader().getTableName() + "新增数据");
// 获取RowChange对象里面的每一行信息
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
printColumn(rowData.getAfterColumnsList(), eventType);
Thread.sleep(500);
}
} else if (eventType == CanalEntry.EventType.DELETE) {
// 是删除语句,业务处理。如果删除的时候是没有数据的情况下,是不会被执行
System.out.println("--------------DELETE---------------");
System.out.println("库名:" + entry.getHeader().getSchemaName() + "-- 表名:" + entry.getHeader().getTableName() + "删除数据");
// 获取RowChange对象里面的每一行信息
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
printColumn(rowData.getBeforeColumnsList(), eventType);
}
} else if (eventType == CanalEntry.EventType.UPDATE) {
// 是修改语句,业务处理。如果修改的时候是没有修改任何数据的情况下,是不会被执行
System.out.println("--------------UPDATE---------------");
System.out.println("库名:" + entry.getHeader().getSchemaName() + "-- 表名:" + entry.getHeader().getTableName() + "修改数据");
// 获取RowChange对象里面的每一行信息
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
printColumn(rowData.getBeforeColumnsList(), eventType);
printColumn(rowData.getAfterColumnsList(), eventType);
}
}
}
}

private static void printColumn(List<CanalEntry.Column> columns, CanalEntry.EventType eventType) {
if (eventType == CanalEntry.EventType.INSERT) {
String title = null;
for (CanalEntry.Column column : columns) {
// System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
if (column.getName().equals("PROC_INST_ID_")) {
title = canalClient.flowTaskCommentService.getTitleById(Long.valueOf(canalClient.flowMessageService.getBusinessKeyByPID(column.getValue())));
}
if (column.getName().equals("ASSIGNEE_")) {
if (column.getValue().equals("admin")) return;
canalClient.flowMessageService.sendRabbitMessage(column.getValue(), title);
}
}
} else {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
}

然后在WebAdminApplication.java中,添加以下代码并调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static void messageMonitor() {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("host", 11111), "example", "root", "password");
try {
//打开连接
connector.connect();
//订阅数据库表,全部表
connector.subscribe("regex");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(1000);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果有数据,处理数据
CanalClient.printEntry(message.getEntries());
List<CanalEntry.Entry> entries = message.getEntries();
}
//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}

其中,需要注意修改的参数:

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("host", 11111), "example", "root", "password");中:

host是你canal所在的服务器地址,11111和example和下面配置保持一致:

11111

example

rootpassword是需要监听的数据库的用户名和密码

regex是需要监听的规则,为正则表达式

效果

至此,配置成功。监听成功的效果如下:

监听成功