canal [kǝ'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
主从复制原理:
所以canal模拟mysql slave也实现了自己的binlog同步,原理如下:
这里可以参考官网:https://github.com/alibaba/canal
安装完canal服务,然后按照文档介绍去配置相应的参数,就可以启动canal的服务了,由于官网介绍的比较细,大家自行去安装和配置,这里着重介绍canal同步binlog代码实践。
下载canal-go客户端,运行里面samples下的main.go,当然可能报错,将路径替换正确就OK了。(有问题欢迎私信) 部分订阅消费代码如下:
func main() {
// 192.168.199.17 替换成你的canal server的地址
// example 替换成-e canal.destinations=example 你自己定义的名字
connector := client.NewSimpleCanalConnector("127.0.0.1", 11113, "", "", "example", 60000, 60*60*1000)
err := connector.Connect()
if err != nil {
log.Println(err)
os.Exit(1)
}
err = connector.Subscribe(".*\\..*")
if err != nil {
log.Println(err)
os.Exit(1)
}
for {
message, err := connector.Get(100, nil, nil)
if err != nil {
log.Println(err)
os.Exit(1)
}
batchId := message.Id
if batchId == -1 || len(message.Entries) <= 0 {
time.Sleep(300 * time.Millisecond)
fmt.Println("===没有数据了===")
continue
}
printEntry(message.Entries)
}
}
我们可以对canal配置文件中已经配置的数据库表进行增删改,看看我们程序是否能显示相关binlog日志
mysql> insert into canal(`id`, `name`) values(1,"hah");
Query OK, 1 row affected (0.04 sec)
mysql>
打印日志:
===没有数据了===
================> binlog[mysql-bin.000002 : 2069],name[test,canal], eventType: INSERT
id : 1 update= true
name : hah update= true
sex : update= true
age : update= true
amount : update= true
email : update= true
occur_time : 2021-09-15 09:47:09 update= true
mysql> update canal set name="hello" where id=1;
Query OK, 1 row affected (0.04 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql>
打印日志如下:
===没有数据了===
================> binlog[mysql-bin.000002 : 2348],name[test,canal], eventType: UPDATE
-------> before
id : 1 update= false
name : hah update= false
sex : update= false
age : update= false
amount : update= false
email : update= false
occur_time : 2021-09-15 09:47:09 update= false
-------> after
id : 1 update= false
name : hello update= true
sex : update= false
age : update= false
amount : update= false
email : update= false
occur_time : 2021-09-15 09:47:09 update= false
===没有数据了===
mysql> delete from canal where id=1;
Query OK, 1 row affected (0.03 sec)
mysql>
打印日志如下:
================> binlog[mysql-bin.000002 : 2643],name[test,canal], eventType: DELETE
id : 1 update= false
name : hello update= false
sex : update= false
age : update= false
amount : update= false
email : update= false
occur_time : 2021-09-15 09:47:09 update= false
===没有数据了===
通过canal-go客户端可以订阅和消费canal server,那么同样我们可以配置kafka,mysql,es,mq等也可以去订阅和消费数据,项目中比较常见的就是将mysql数据同步到es中提供搜索查询能力,而这最好的方式就是通过canal中间件来做,减少人工的维护成本,提供工作效率。
关注:堆栈future