Canal 实现 Mysql 数据库实时数据同步

1.1 canal 介绍

​ Canal 是一个基于 MySQL 二进制日志的高性能数据同步系统。Canal 广泛用于阿里巴巴集团(包括https://www.taobao.com),以提供可靠的低延迟增量数据管道,github 地址:https://github.com/alibaba/canal

Canal Server 能够解析 MySQL binlog 并订阅数据更改,而 Canal Client 可以实现将更改广播到任何地方,例如数据库和 Apache Kafka。

它具有以下功能:

  • 支持所有平台。
  • 支持由 Prometheus 提供支持的细粒度系统监控。
  • 支持通过不同方式解析和订阅 MySQL binlog,例如通过 GTID。
  • 支持高性能,实时数据同步。(详见 Performance)
  • Canal Server 和 Canal Client 都支持 HA / Scalability,由 Apache ZooKeeper 提供支持
  • Docker 支持。

缺点:

不支持全量更新,只支持增量更新。

完整 wiki 地址:https://github.com/alibaba/canal/wiki

1.2 运作原理

原理很简单:

  1. Canal 模拟 MySQL 的 slave 的交互协议,伪装成 mysql slave,并将转发协议发送到 MySQL Master 服务器。
  2. MySQL Master 接收到转储请求并开始将二进制日志推送到 slave(即 canal)。
  3. Canal 将二进制日志对象解析为自己的数据类型(原始字节流)

如图所示:

准备工作

2.1 下载解压 canal-server

​ 通过 github 下载 canal-server release 版本(本次安装文档使用 v1.1.4)

|

1

2

|

root@locahost:/# wget  https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

|

​ 解压

|

1

2

|

tar -zxvf canal.deployer-1.1.4.tar.gz

|

2.2 下载解压 canal-adapter

​ 通过 github 下载 canal-adapter release 版本(本次安装文档使用 v1.1.4)

|

1

2

|

root@locahost:/# wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz

|

​ 解压

|

1

2

|

tar -zxvf canal.adapter-1.1.4.tar.gz

|

配置 canal-server

3.1 canal-server 配置

​ 解压之后进入 conf 文件夹中,修改 canal.properties 根据实际需要来修改(如果不使用 kafka 或 MQ 默认 tcp 即可)

|

1

2

|

canal.destinations = prod # 指定 instance 的名字多个使用逗号分隔

|

​ 保存之后在 conf 目录创建 prod 文件夹并将 example 文件夹中的nstance.propertiescopy 到 and_prod 中

|

1

2

3

|

mkdir ant_prod  # 创建文件夹

cp example/nstance.properties  prod/ # copy 文件

|

​ 修改nstance.properties配置如下:

|

1

2

3

4

5

6

|

canal.instance.master.address=127.0.0.1:3306      # 源 Mysql 地址

canal.instance.dbUsername=canal # 源 Mysql 账号

canal.instance.dbPassword=canal # 源 Mysql 密码

canal.instance.connectionCharset=UTF-8            # 与源数据库编码格式一致

canal.instance.defaultDatabaseName=test_database  # 默认监听源数据库

|

3.2 canal-server 启动

​ 进入 canal-server bin 目录 启动

|

1

2

3

|

cd canal-server/bin # 进入目录

./startup.sh & # 后台启动

|

​ 查看日志,是否启动成功

|

1

2

|

cd canal-server/logs/ant_prod #进入日志目录

|

​ 启动成功:

|

1

2

3

4

5

6

7

8

9

10

11

12

|

2020-06-09 17:13:04.956 [main] WARN  o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property ‘connectionCharset’ being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]

2020-06-09 17:13:04.990 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]

2020-06-09 17:13:04.990 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [ant_prod/instance.properties]

2020-06-09 17:13:05.305 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-ant_prod

2020-06-09 17:13:05.311 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - –> init table filter : ^...$

2020-06-09 17:13:05.311 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - –> init table black filter :

2020-06-09 17:13:05.315 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful….

2020-06-09 17:13:05.422 [destination = ant_prod , address = rm-wz99s5v03gso12521.mysql.rds.aliyuncs.com/192.xxxxxx:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - —> begin to find start position, it will be long time for reset or first position

2020-06-09 17:13:05.423 [destination = ant_prod , address = rm-wz99s5v03gso12521.mysql.rds.aliyuncs.com/192.xxxxxx:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status

2020-06-09 17:13:06.483 [destination = ant_prod , address = rm-wz99s5v03gso12521.mysql.rds.aliyuncs.com/192.xxxxxx:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - —> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000234,position=6676924,serverId=184376678,gtid=,timestamp=1591693973000] cost : 1051ms , the next step is binlog dump

|

配置 canal-adapter

4.1 canal-adapter 配置

​ 由于 Mysql 是 8.0 这里需要下载 mysql-connector-java-8.0.20.jar,并将其放入 lib 中

|

1

2

|

cp mysql-connector-java-8.0.20.jar /canal-adapter/lib/

|

​ 解压之后进入 conf 文件夹中,修改 application.yml

|

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

|

server:

  port: 8089

spring:

  jackson:

    date-format: yyyy-MM-dd HH🇲🇲ss

    time-zone: GMT+8

    default-property-inclusion: non_null

canal.conf:

  mode: tcp # kafka rocketMQ

  canalServerHost: 127.0.0.1:11111

  batchSize: 500

  syncBatchSize: 1000

  retries: 0

  timeout:

  accessKey:

  secretKey:

源 Mysql 地址账号密码等

  srcDataSources:

    defaultDS:

      url: jdbc:mysql://localhost:3306/test_database?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai

      username: canal

      password: canal

需要实时同步数据库,如果多个实例进行区分即可

  canalAdapters:

  - instance: prod # canal instance,在 canal-server 中指定 instance 的名称

    groups:

    - groupId: g1

      outerAdapters:

      - name: rdb

        key: mysql1 # 唯一标示

        properties:

          jdbc.driverClassName: com.mysql.jdbc.Driver

          jdbc.url: jdbc:mysql://localhost:3306/test_database_01?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai

          jdbc.username: canal

          jdbc.password: canal

|

​ 编辑 rdb 目录下面表的映射文件,数据库 / 表 (多个表创建多个映射文件,文件名对应表名)以此类推

|

1

2

3

4

5

6

7

8

9

10

11

12

|

dataSourceKey: defaultDS

destination: prod

outerAdapterKey: mysql1

concurrent: true

dbMapping:

  database: test_database_01

  table: test

  targetTable: test_database_01.test

  targetPk:

    id: id

  mapAll: true

|

4.1 canal-adapter 启动

​ 进入 canal-adapter/bin 目录 启动

|

1

2

3

|

cd canal-adapter/bin # 进入目录

./startup.sh & # 后台启动

|

​ 查看日志,是否启动成功

|

1

2

3

|

cd canal-adapter/adapter/logs/ #进入日志目录

tail -f adapter.log # 查看日志是否启动成功

|

测试数据库同步

|

1

2

|

更新 / 删除 / 批量插入 / 批量更新 / 批量删除

|

原文链接:https://www.jianshu.com/p/d4c177f0d831
作者:qingwenLi