介绍
canal
是阿里云开发的一个用于将 MySQL 数据同步到其他存储中,如:MySQL、消息队列、elasticsearch等。
原理
MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
canal
是把自己伪装成 MySQL slave,通过解析 binlog 获取数据同步到其他存贮中。
开启binlog
修改配置
要开启 MySQL binlog 需要修改 my.cnf(linux系统) 或 my.ini(windows系统) 配置文件。
1
2
3
4
5
6
7
8
9
10
11
[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1
# 要监控的数据库名称,这里只监控test,如果都监控就注释掉
binlog-do-db=test
验证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.04 sec)
mysql> show binary logs;
+------------------+-----------+
| Log_name | File_size |
+------------------+-----------+
| mysql-bin.000001 | 2851 |
+------------------+-----------+
1 row in set (0.00 sec)
创建账号
canal
同步数据是将自己伪装成 MySQL Slave ,获取数据是需要在 MySQL 开一个供canal
访问的账号。
1
2
3
4
5
6
7
# 创建账号
create user 'canal'@'%' identified by 'canal';
# 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%';
flush privileges;
安装部署
下载安装
canal
安装很简单,但是需要 Java 环境,这里不详述 Java 安装,直接官网下载安装即可。
1
2
3
4
5
6
# 下载最新版
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
# 解压,这里要注意,解压后没有文件夹,所有要解压到一个目录里
tar zxvf canal.deployer-1.1.6.tar.gz -C /opt/canal.deployer-1.1.6
# 创建一个软连,方便以后升级版本
ln -s /opt/canal.deployer-1.1.6 /opt/canal.deployer
修改配置文件
1
2
3
4
5
6
7
8
9
10
11
12
# 修改配置文件
vi /opt/canal.deployer/conf/example/instance.properties
canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
启动
1
/opt/canal.deployer/bin/startup.sh
因为版本问题,启动后我没有看到监听端口的提示,但是也是成功了
验证启动
1
2
3
4
jps
54915
59156 Jps
48838 CanalLauncher
测试
创建一个 Java 项目,依赖如下:
1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.6</version>
</dependency>
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// 注意这里的包名,换成你创建项目的包名
package com.tiangang;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class SimpleCanalClient {
private final CanalConnector connector;
private Thread thread = null;
private final Thread.UncaughtExceptionHandler handler = (t, e) -> e.printStackTrace();
private volatile boolean running = false;
private final static int BATCH_SIZE = 5 * 1024;
public SimpleCanalClient(CanalConnector connector) {
this.connector = connector;
}
public static void main(String[] args) {
// 根据ip,直接创建链接,无HA的功能
String destination = "example";
String ip = AddressUtils.getHostIp();
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, 11111),
destination,
"canal",
"canal@123456");
final SimpleCanalClient simpleCanalClient = new SimpleCanalClient(connector);
simpleCanalClient.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
System.out.println("## stop the canal client");
simpleCanalClient.stop();
} catch (Throwable e) {
System.out.println("##something goes wrong when stopping canal:");
e.printStackTrace();
} finally {
System.out.println("## canal client is down.");
}
}));
}
public void start() {
if (this.connector == null) {
System.out.println("connector不能为空,启动失败");
return;
}
thread = new Thread(this::process);
thread.setUncaughtExceptionHandler(handler);
running = true;
thread.start();
System.out.println("canal client started...");
}
public void stop() {
if (!running) {
return;
}
running = false;
if (thread != null) {
try {
thread.join();
} catch (InterruptedException e) {
// ignore
}
}
System.out.println("canal client stopped...");
}
private void process() {
while (running) {
try {
//打开连接
connector.connect();
//订阅数据库表,全部表
connector.subscribe(".*\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (running) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果有数据,处理数据
printEntry(message.getEntries());
}
if (batchId != -1) {
// 提交确认
connector.ack(batchId);
}
}
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(1000L);
} catch (InterruptedException e1) {
// ignore
}
connector.rollback(); // 处理失败, 回滚数据
} finally {
connector.disconnect();
}
}
}
/**
* 打印canal server解析binlog获得的实体类信息
*/
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
//开启/关闭事务的实体类型,跳过
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
//获取操作类型:insert/update/delete类型
CanalEntry.EventType eventType = rowChage.getEventType();
//打印Header信息
System.out.println(String.format("================》; binlog[%s:%s] , dbName:%s, tableName:%s , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
//判断是否是DDL语句
if (rowChage.getIsDdl()) {
System.out.println("================》;isDDL: true,sql:" + rowChage.getSql());
}
//获取RowChange对象里的每一行数据,打印出来
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
//如果是删除语句
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//如果是新增语句
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//如果是更新的语句
} else {
//变更前的数据
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//变更后的数据
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
好了,现在可以运行起来看看吧,然后再test
库中创建一个表,在里面添加、修改、删除一些数据,看看 Java 控制台输出什么。
问题
在启动的过程中,遇到了启动失败的现象,经过查看日志,发现日志里面:
1
Unrecognized VM option 'AggressiveOpts'
一眼看去就是 Java 虚拟机的参数问题,经过百度搜索发现,这是因为 JDK 版本问题,原因是-XX:+AggressiveOpts和-XX:+UseBiasedLocking两个参数分别在11、15版本中被废弃,而我用的是 JDK19 ,所有或出现这个问题。处理办法很简单,在启动文件/opt/canal.deployer/bin/startup.sh
,找到这两个参数,删掉即可。