(1)理论基础
canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。canal 就是一个同步增量数据的一个工具。
canal 应用场景1
我们在做mysql与redis的数据同步时,往往采用的是代码层实现,或者通过spring-cache等缓存框架。但是仍然有某些场景,比如说原项目无源码,或者不能进行二开时,就需要独立的第三方来实现数据同步。我们需要一种无代码入侵式的数据同步,完全由第三方组件管理。这就需要借助canal来实现mysql到redis的数据同步
canal 应用场景2
将用户的订单信息传入后台。
后台服务器将订单信息保存到mysql数据库。
又 canal 进行监控mysql中的写操作变化,将发生修改(Insert) 的数据写入到kafka
通过sparkStreaming读取Kafka中的数据,进行计算。
将计算好的结果,重新写入到服务器中,并返回到浏览器。
canal的作用
它可以实现增量同步,拿A商品举例,第一批数据中,A商品有100条,canal便会将这批新增的数据写入Kafka,再交给spark处理。第二次又新增一批数据,于是canal又将监控到新增数据写入到Kafka中。依次类推,最终由spark计算出结果返回出去。
canal 除了写入kafka 还能将数据写入到其他中间件(mq、elasticsearch、hbase等)
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
(2)环境配置
1.【mysql准备】
使用命令查看数据库是否开启binlog模式:
log_bin属性值为ON,则binlog模式开启;为OFF则binlog模式关闭。
show variables like 'log_%';
2.【canal解压】
canal.deployer-1.1.6.tar.gz
3. 【canal配置】
conf\example\instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.filter.regex=.*\\..*
conf\canal.properties
canal.port = 11111
canal.destinations = example
4. 【Mysql读取位置】
show master status;
reset master;
特别注意:读取位置position必须和meta.dat中一致
5.【启动canal】
bat
6. 【查看日志】
logs\example\example.log
(3)项目应用
1. 【pom.xml】
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
2. 【application.yml】
canal:
server: 127.0.0.1:11111
destination: example
3. 【Canal开发】
EntryHandler<T>接口实现
// 说明:监听表中数据的变化【添加、修改、删除】、自动调用对应的方法
@Component
@CanalTable("person")
public class PersonHandler implements EntryHandler<Person> {
@Override
public void insert(Person person) {
System.out.println("表中添加了数据");
System.out.println(person);
}
@Override
public void update(Person before, Person after) {
System.out.println("表中修改了数据");
System.out.println(before);
System.out.println(after);
}
@Override
public void delete(Person person) {
System.out.println("表中删除了数据");
System.out.println(person);
}
}