直接看官方文档 https://github.com/alibaba/canal
https://github.com/alibaba/canal/wiki/QuickStart
https://github.com/alibaba/canal/wiki/ClientExample
https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
准备
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
1 2 3 4 [mysqld] log-bin =mysql-bin binlog-format =ROW server_id =1
注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
1 2 3 4 CREATE USER canal IDENTIFIED BY 'canal' ; GRANT SELECT , REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal' @'%' ; FLUSH PRIVILEGES ;
docker 启动 1 docker pull canal/canal-server:v1.1.8
1 2 3 4 5 6 7 8 9 10 11 12 13 14 wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh sh run.sh -e canal.auto.scan=false \ -e canal.destinations=test \ -e canal.instance.master.address=192.168.64.1:3306 \ -e canal.instance.dbUsername=canal \ -e canal.instance.dbPassword=canal158@ \ -e canal.instance.connectionCharset=UTF-8 \ -e canal.instance.tsdb.enable=true \ -e canal.instance.gtidon=false \ -e 'canal.instance.filter.regex=a\\..*' \ -e 'canal.instance.filter.black.regex=mysql\\..*,information_schema\\..*,performance_schema\\..*'
注意 上面启动时的反斜杠,因为使用了单引号包起来了,所以要转义。
如果canal-server可以连接上,就是不能拉取监听消息,排查下这个过滤表达式。
1 tail -f /home/admin/canal-server/logs/test/test.log
观察输出:
1 2 3 2025-09-25 08:55:09.550 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-test 2025-09-25 08:55:10.587 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^ a\. .*$ 2025-09-25 08:55:10.588 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^ information_ schema\. .*$ |^ performance_ schema\. .*$ |^ mysql\. slave_ .*$
第一行 说明启动了instance test
第二行 白名单 监听a 库下所有表(只有一个反斜杠,如果有两个说明有问题)
第三行 黑名单 排除了information_schema、performance_schema、mysql库下的表
客户端 1 2 3 4 5 <dependency > <groupId > com.alibaba.otter</groupId > <artifactId > canal.client</artifactId > <version > 1.1.0</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample {public static void main (String args[]) { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress (AddressUtils.getHostIp(), 11111 ), "test" , null , null ); int batchSize = 1000 ; int emptyCount = 0 ; try { connector.connect(); connector.subscribe(".*\\..*" ); connector.rollback(); int totalEmptyCount = 120 ; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0 ) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000 ); } catch (InterruptedException e) { } } else { emptyCount = 0 ; printEntry(message.getEntries()); } connector.ack(batchId); } System.out.println("empty too many times, exit" ); } finally { connector.disconnect(); } }private static void printEntry (List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue ; } RowChange rowChage = null ; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException ("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s" , entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before" ); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after" ); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn (List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
启动 canal-server 然后 执行java程序, 执行数据库sql 操作,查看日志。
spring boot集成 1 2 3 4 5 <dependency > <groupId > com.alibaba.otter</groupId > <artifactId > canal.client</artifactId > <version > 1.1.0</version > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.common.utils.AddressUtils;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.net.InetSocketAddress;@Configuration public class CanalClientConfig { @Bean public CanalConnector canalConnector () { String hostname = AddressUtils.getHostIp(); return CanalConnectors.newSingleConnector( new InetSocketAddress (hostname, 11111 ), "example" , "" , "" ); } @Bean(initMethod = "start", destroyMethod = "stop") public CanalMessageListener canalMessageListener (CanalConnector canalConnector) { return new CanalMessageListener (canalConnector); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.util.CollectionUtils;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class CanalMessageListener { private static final Logger logger = LoggerFactory.getLogger(CanalMessageListener.class); private static final int BATCH_SIZE = 1000 ; private final CanalConnector canalConnector; private final ExecutorService executorService; private volatile boolean running = false ; public CanalMessageListener (CanalConnector canalConnector) { this .canalConnector = canalConnector; this .executorService = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread (r, "canal-message-processor" ); thread.setDaemon(false ); return thread; }); } public void start () { running = true ; canalConnector.connect(); canalConnector.subscribe(".*\\..*" ); canalConnector.rollback(); logger.info("Canal message listener started" ); executorService.execute(this ::processMessages); } private void processMessages () { while (running) { try { Message message = canalConnector.getWithoutAck(BATCH_SIZE); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0 ) { TimeUnit.MILLISECONDS.sleep(100 ); } else { processEntries(message.getEntries()); canalConnector.ack(batchId); } } catch (Exception e) { logger.error("Error processing canal messages" , e); try { canalConnector.rollback(); TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); break ; } } } } private void processEntries (List<CanalEntry.Entry> entries) { if (CollectionUtils.isEmpty(entries)) { return ; } for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) { continue ; } try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); CanalEntry.EventType eventType = rowChange.getEventType(); String database = entry.getHeader().getSchemaName(); String table = entry.getHeader().getTableName(); logger.info("数据库: {}, 表: {}, 操作类型: {}" , database, table, eventType); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { logger.info("删除前数据: {}" , rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { logger.info("插入后数据: {}" , rowData.getAfterColumnsList()); } else { logger.info("更新前数据: {}" , rowData.getBeforeColumnsList()); logger.info("更新后数据: {}" , rowData.getAfterColumnsList()); } } } catch (Exception e) { logger.error("Error processing entry" , e); } } } public void stop () { running = false ; canalConnector.disconnect(); executorService.shutdown(); try { if (!executorService.awaitTermination(5 , TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { executorService.shutdownNow(); } logger.info("Canal message listener stopped" ); } }
根据字段名设置对象属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public class DbFieldEntitySetter { private static final ConversionService conversionService; static { DefaultConversionService service = new DefaultConversionService (); service.addConverter(String.class, Long.class, Long::parseLong); service.addConverter(String.class, LocalTime.class, source -> LocalTime.parse(source, DateTimeFormatter.ofPattern(DateUtils.FORMAT_HOUR_MINUTE_SECOND))); service.addConverter(String.class, LocalDate.class, source -> LocalDate.parse(source, DateTimeFormatter.ofPattern(DateUtils.FORMAT_YEAR_MONTH_DAY))); service.addConverter(String.class, LocalDateTime.class, source -> LocalDateTime.parse(source, DateTimeFormatter.ofPattern(DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND))); conversionService = service; } public static void setFieldValuesByDbColumn (Object entity, Map<String, String> dbFieldMap) { if (entity == null || dbFieldMap == null || dbFieldMap.isEmpty()) { return ; } Class<?> entityClass = entity.getClass(); TableInfo tableInfo = TableInfoHelper.getTableInfo(entityClass); if (tableInfo == null ) { throw new IllegalArgumentException ("实体类" + entityClass.getName() + "未配置MyBatis-Plus映射" ); } Map<String, String> dbColumnToPropertyMap = new HashMap <>(); List<TableFieldInfo> fieldList = tableInfo.getFieldList(); for (TableFieldInfo fieldInfo : fieldList) { dbColumnToPropertyMap.put(fieldInfo.getColumn(), fieldInfo.getProperty()); } dbColumnToPropertyMap.put(tableInfo.getKeyColumn(), tableInfo.getKeyProperty()); BeanWrapper beanWrapper = new BeanWrapperImpl (entity); for (Map.Entry<String, String> entry : dbFieldMap.entrySet()) { String dbColumnName = entry.getKey(); String stringValue = entry.getValue(); if (stringValue == null || stringValue.trim().isEmpty()) { continue ; } String propertyName = dbColumnToPropertyMap.get(dbColumnName); if (propertyName != null && beanWrapper.isWritableProperty(propertyName)) { try { Class<?> targetType = beanWrapper.getPropertyType(propertyName); Object convertedValue = conversionService.convert(stringValue, targetType); beanWrapper.setPropertyValue(propertyName, convertedValue); } catch (Exception e) { throw new RuntimeException ("转换字段值失败,字段名:" + dbColumnName + ",值:" + stringValue + ",目标类型:" + beanWrapper.getPropertyType(propertyName).getSimpleName(), e); } } } } }