canal入门

直接看官方文档

https://github.com/alibaba/canal

https://github.com/alibaba/canal/wiki/QuickStart

https://github.com/alibaba/canal/wiki/ClientExample

https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

准备

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    1
    2
    3
    4
    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    1
    2
    3
    4
    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

docker 启动

1
docker pull canal/canal-server:v1.1.8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 下载脚本
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh

# 构建一个destination name为test的队列
sh run.sh -e canal.auto.scan=false \
-e canal.destinations=test \
-e canal.instance.master.address=192.168.64.1:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal158@ \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e 'canal.instance.filter.regex=a\\..*' \
-e 'canal.instance.filter.black.regex=mysql\\..*,information_schema\\..*,performance_schema\\..*'

注意

上面启动时的反斜杠,因为使用了单引号包起来了,所以要转义。

如果canal-server可以连接上,就是不能拉取监听消息,排查下这个过滤表达式。

1
tail -f /home/admin/canal-server/logs/test/test.log

观察输出:

1
2
3
2025-09-25 08:55:09.550 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-test 
2025-09-25 08:55:10.587 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^a\..*$
2025-09-25 08:55:10.588 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^information_schema\..*$|^performance_schema\..*$|^mysql\.slave_.*$

第一行 说明启动了instance test

第二行 白名单 监听a 库下所有表(只有一个反斜杠,如果有两个说明有问题)

第三行 黑名单 排除了information_schema、performance_schema、mysql库下的表

客户端

1
2
3
4
5
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "test", null, null);
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}

connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}

System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}

private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}

RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}

EventType eventType = rowChage.getEventType();
System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));

for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------&gt; before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------&gt; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}

private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}




}

启动 canal-server 然后 执行java程序, 执行数据库sql 操作,查看日志。

spring boot集成

1
2
3
4
5
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.net.InetSocketAddress;

@Configuration
public class CanalClientConfig {

@Bean
public CanalConnector canalConnector() {
// 创建Canal连接器
// 这里使用单节点模式,实际生产环境可以配置集群模式
String hostname = AddressUtils.getHostIp();
// Canal Server默认端口是11111
return CanalConnectors.newSingleConnector(
new InetSocketAddress(hostname, 11111),
"example", // destination,对应Canal Server的instance
"", "" // 用户名和密码,默认为空
);
}

@Bean(initMethod = "start", destroyMethod = "stop")
public CanalMessageListener canalMessageListener(CanalConnector canalConnector) {
return new CanalMessageListener(canalConnector);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CanalMessageListener {
private static final Logger logger = LoggerFactory.getLogger(CanalMessageListener.class);
private static final int BATCH_SIZE = 1000;

private final CanalConnector canalConnector;
private final ExecutorService executorService;
private volatile boolean running = false;

public CanalMessageListener(CanalConnector canalConnector) {
this.canalConnector = canalConnector;
// 创建一个单线程池处理消息,实际可根据需求调整
this.executorService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r, "canal-message-processor");
thread.setDaemon(false);
return thread;
});
}

public void start() {
running = true;
// 连接到Canal Server
canalConnector.connect();
// 订阅所有库表的变更,也可以指定特定库表,如"dbName.tableName"
canalConnector.subscribe(".*\\..*");
// 回滚到上次处理成功的位置
canalConnector.rollback();

logger.info("Canal message listener started");

// 在单独的线程中处理消息,避免阻塞主线程
executorService.execute(this::processMessages);
}

private void processMessages() {
while (running) {
try {
// 获取指定数量的数据,不会阻塞,若没有数据会返回空
Message message = canalConnector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();

if (batchId == -1 || size == 0) {
// 如果没有数据,短暂休眠后再尝试,避免CPU空转
TimeUnit.MILLISECONDS.sleep(100);
} else {
// 处理消息
processEntries(message.getEntries());
// 确认消息已处理成功
canalConnector.ack(batchId);
}
} catch (Exception e) {
logger.error("Error processing canal messages", e);
try {
// 发生错误时回滚
canalConnector.rollback();
// 出错后稍等再重试
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}

private void processEntries(List<CanalEntry.Entry> entries) {
if (CollectionUtils.isEmpty(entries)) {
return;
}

for (CanalEntry.Entry entry : entries) {
// 只处理行数据变更
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
continue;
}

try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
String database = entry.getHeader().getSchemaName();
String table = entry.getHeader().getTableName();

logger.info("数据库: {}, 表: {}, 操作类型: {}", database, table, eventType);

// 处理每一行的变更
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 处理变更数据,这里只是打印,实际应用中可以根据需求进行业务处理
if (eventType == CanalEntry.EventType.DELETE) {
logger.info("删除前数据: {}", rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
logger.info("插入后数据: {}", rowData.getAfterColumnsList());
} else {
logger.info("更新前数据: {}", rowData.getBeforeColumnsList());
logger.info("更新后数据: {}", rowData.getAfterColumnsList());
}
}
} catch (Exception e) {
logger.error("Error processing entry", e);
}
}
}

public void stop() {
running = false;
// 断开连接
canalConnector.disconnect();
// 关闭线程池
executorService.shutdown();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
logger.info("Canal message listener stopped");
}
}

根据字段名设置对象属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
* 根据数据库字段名给MyBatis-Plus实体类设置属性值的工具类(支持字符串值类型转换)
*/
public class DbFieldEntitySetter {


private static final ConversionService conversionService;

static {
DefaultConversionService service = new DefaultConversionService();
service.addConverter(String.class, Long.class, Long::parseLong);
service.addConverter(String.class, LocalTime.class,
source -> LocalTime.parse(source, DateTimeFormatter.ofPattern(DateUtils.FORMAT_HOUR_MINUTE_SECOND)));
service.addConverter(String.class, LocalDate.class,
source -> LocalDate.parse(source, DateTimeFormatter.ofPattern(DateUtils.FORMAT_YEAR_MONTH_DAY)));
service.addConverter(String.class, LocalDateTime.class,
source -> LocalDateTime.parse(source, DateTimeFormatter.ofPattern(DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND)));
conversionService = service;
}

/**
* 将数据库字段名-字符串值映射设置到实体对象中(自动转换类型)
* @param entity 实体对象(MyBatis-Plus映射类)
* @param dbFieldMap 数据库字段名到字符串值的映射(key: 数据库字段名,如user_id;value: 字符串类型的字段值)
*/
public static void setFieldValuesByDbColumn(Object entity, Map<String, String> dbFieldMap) {
if (entity == null || dbFieldMap == null || dbFieldMap.isEmpty()) {
return;
}

// 1. 获取实体类的MyBatis-Plus元数据
Class<?> entityClass = entity.getClass();
TableInfo tableInfo = TableInfoHelper.getTableInfo(entityClass);
if (tableInfo == null) {
throw new IllegalArgumentException("实体类" + entityClass.getName() + "未配置MyBatis-Plus映射");
}

// 2. 构建数据库字段名 → 实体属性名的映射关系
Map<String, String> dbColumnToPropertyMap = new HashMap<>();
List<TableFieldInfo> fieldList = tableInfo.getFieldList();
for (TableFieldInfo fieldInfo : fieldList) {
dbColumnToPropertyMap.put(fieldInfo.getColumn(), fieldInfo.getProperty());
}
dbColumnToPropertyMap.put(tableInfo.getKeyColumn(), tableInfo.getKeyProperty());

// 3. 遍历数据库字段映射,设置实体属性值(带类型转换)
BeanWrapper beanWrapper = new BeanWrapperImpl(entity);
for (Map.Entry<String, String> entry : dbFieldMap.entrySet()) {
String dbColumnName = entry.getKey();
String stringValue = entry.getValue();

// 跳过空值
if (stringValue == null || stringValue.trim().isEmpty()) {
continue;
}

// 根据数据库字段名找到对应的实体属性名
String propertyName = dbColumnToPropertyMap.get(dbColumnName);
if (propertyName != null && beanWrapper.isWritableProperty(propertyName)) {
try {
// 获取属性的目标类型
Class<?> targetType = beanWrapper.getPropertyType(propertyName);

// 将字符串值转换为目标类型
Object convertedValue = conversionService.convert(stringValue, targetType);

// 设置转换后的值
beanWrapper.setPropertyValue(propertyName, convertedValue);
} catch (Exception e) {
throw new RuntimeException("转换字段值失败,字段名:" + dbColumnName
+ ",值:" + stringValue + ",目标类型:"
+ beanWrapper.getPropertyType(propertyName).getSimpleName(), e);
}
}
}
}
}

canal入门
http://hanqichuan.com/2025/09/19/mysql/canal入门/
作者
韩启川
发布于
2025年9月19日
许可协议