Canal 实现 Mysql 数据库实时数据同步
1.1 canal 介绍
Canal 是一个基于 MySQL 二进制日志的高性能数据同步系统。Canal 广泛用于阿里巴巴集团(包括https://www.taobao.com),以提供可靠的低延迟增量数据管道,github 地址:https://github.com/alibaba/canal
Canal Server 能够解析 MySQL binlog 并订阅数据更改,而 Canal Client 可以实现将更改广播到任何地方,例如数据库和 Apache Kafka。
它具有以下功能:
- 支持所有平台。
- 支持由 Prometheus 提供支持的细粒度系统监控。
- 支持通过不同方式解析和订阅 MySQL binlog,例如通过 GTID。
- 支持高性能,实时数据同步。(详见 Performance)
- Canal Server 和 Canal Client 都支持 HA / Scalability,由 Apache ZooKeeper 提供支持
- Docker 支持。
缺点:
不支持全量更新,只支持增量更新。
完整 wiki 地址:https://github.com/alibaba/canal/wiki
1.2 运作原理
原理很简单:
- Canal 模拟 MySQL 的 slave 的交互协议,伪装成 mysql slave,并将转发协议发送到 MySQL Master 服务器。
- MySQL Master 接收到转储请求并开始将二进制日志推送到 slave(即 canal)。
- Canal 将二进制日志对象解析为自己的数据类型(原始字节流)
如图所示:
准备工作
2.1 下载解压 canal-server
通过 github 下载 canal-server release 版本(本次安装文档使用 v1.1.4)
|
1
2
|
root@locahost:/# wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
|
解压
|
1
2
|
tar -zxvf canal.deployer-1.1.4.tar.gz
|
2.2 下载解压 canal-adapter
通过 github 下载 canal-adapter release 版本(本次安装文档使用 v1.1.4)
|
1
2
|
root@locahost:/# wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
|
解压
|
1
2
|
tar -zxvf canal.adapter-1.1.4.tar.gz
|
配置 canal-server
3.1 canal-server 配置
解压之后进入 conf 文件夹中,修改 canal.properties 根据实际需要来修改(如果不使用 kafka 或 MQ 默认 tcp 即可)
|
1
2
|
canal.destinations = prod # 指定 instance 的名字多个使用逗号分隔
|
保存之后在 conf 目录创建 prod 文件夹并将 example 文件夹中的nstance.properties
copy 到 and_prod 中
|
1
2
3
|
mkdir ant_prod # 创建文件夹
cp example/nstance.properties prod/ # copy 文件
|
修改nstance.properties
配置如下:
|
1
2
3
4
5
6
|
canal.instance.master.address=127.0.0.1:3306 # 源 Mysql 地址
canal.instance.dbUsername=canal # 源 Mysql 账号
canal.instance.dbPassword=canal # 源 Mysql 密码
canal.instance.connectionCharset=UTF-8 # 与源数据库编码格式一致
canal.instance.defaultDatabaseName=test_database # 默认监听源数据库
|
3.2 canal-server 启动
进入 canal-server bin 目录 启动
|
1
2
3
|
cd canal-server/bin # 进入目录
./startup.sh & # 后台启动
|
查看日志,是否启动成功
|
1
2
|
cd canal-server/logs/ant_prod #进入日志目录
|
启动成功:
|
1
2
3
4
5
6
7
8
9
10
11
12
|
2020-06-09 17:13:04.956 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property ‘connectionCharset’ being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]
2020-06-09 17:13:04.990 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2020-06-09 17:13:04.990 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [ant_prod/instance.properties]
2020-06-09 17:13:05.305 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-ant_prod
2020-06-09 17:13:05.311 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - –> init table filter : ^...$
2020-06-09 17:13:05.311 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - –> init table black filter :
2020-06-09 17:13:05.315 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful….
2020-06-09 17:13:05.422 [destination = ant_prod , address = rm-wz99s5v03gso12521.mysql.rds.aliyuncs.com/192.xxxxxx:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - —> begin to find start position, it will be long time for reset or first position
2020-06-09 17:13:05.423 [destination = ant_prod , address = rm-wz99s5v03gso12521.mysql.rds.aliyuncs.com/192.xxxxxx:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2020-06-09 17:13:06.483 [destination = ant_prod , address = rm-wz99s5v03gso12521.mysql.rds.aliyuncs.com/192.xxxxxx:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - —> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000234,position=6676924,serverId=184376678,gtid=,timestamp=1591693973000] cost : 1051ms , the next step is binlog dump
|
配置 canal-adapter
4.1 canal-adapter 配置
由于 Mysql 是 8.0 这里需要下载 mysql-connector-java-8.0.20.jar,并将其放入 lib 中
|
1
2
|
cp mysql-connector-java-8.0.20.jar /canal-adapter/lib/
|
解压之后进入 conf 文件夹中,修改 application.yml
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH🇲🇲ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # kafka rocketMQ
canalServerHost: 127.0.0.1:11111
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
源 Mysql 地址账号密码等
srcDataSources:
defaultDS:
url: jdbc:mysql://localhost:3306/test_database?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
username: canal
password: canal
需要实时同步数据库,如果多个实例进行区分即可
canalAdapters:
- instance: prod # canal instance,在 canal-server 中指定 instance 的名称
groups:
- groupId: g1
outerAdapters:
- name: rdb
key: mysql1 # 唯一标示
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://localhost:3306/test_database_01?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
jdbc.username: canal
jdbc.password: canal
|
编辑 rdb 目录下面表的映射文件,数据库 / 表 (多个表创建多个映射文件,文件名对应表名)以此类推
|
1
2
3
4
5
6
7
8
9
10
11
12
|
dataSourceKey: defaultDS
destination: prod
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: test_database_01
table: test
targetTable: test_database_01.test
targetPk:
id: id
mapAll: true
|
4.1 canal-adapter 启动
进入 canal-adapter/bin 目录 启动
|
1
2
3
|
cd canal-adapter/bin # 进入目录
./startup.sh & # 后台启动
|
查看日志,是否启动成功
|
1
2
3
|
cd canal-adapter/adapter/logs/ #进入日志目录
tail -f adapter.log # 查看日志是否启动成功
|
测试数据库同步
|
1
2
|
更新 / 删除 / 批量插入 / 批量更新 / 批量删除
|
原文链接:https://www.jianshu.com/p/d4c177f0d831
作者:qingwenLi