Canal 测试

Canal

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

https://github.com/alibaba/canal

https://github.com/alibaba/canal/releases/tag/canal-1.1.4

工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

Canal Admin

canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide

https://github.com/alibaba/canal/wiki/Canal-Admin-Guide

https://github.com/alibaba/canal/wiki/Canal-Admin-QuickStart

Canal java 客户端

https://github.com/alibaba/canal/wiki/ClientExample

使用

Mysql 开启binlog日志

先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

# 唯一标识 不要和 canal 的 slaveId 重复
server-id=1

# binlog文件名
log-bin=mysql-binlog

# 选择 ROW 模式
binlog-format=ROW 

# 要忽略的数据库
binlog-ignore-db=mysql

启动mysql后可以在data目录中看到新增的文件

  • mysql-binlog.000001
  • mysql-binlog.index

查看一下状态:

SHOW MASTER STATUS;

"File"  "Position"  "Binlog_Do_DB"  "Binlog_Ignore_DB"  "Executed_Gtid_Set"
"mysql-binlog.000001"   "154"   ""  "mysql" ""

新建账号

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

查看一下:
SELECT USER,HOST FROM mysql.user;

Canal 配置

下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

解压缩下载的canal-deployer压缩包

conf/canal.properties

默认配置,注意:

# 服务端口配置
canal.port = 11111

# 客户端连接账号配置(目前未设置)
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# 客户端连接端点名
canal.destinations = example

conf/example/instance.properties

默认配置,注意:

## canal伪装的slaveId(只要和已有的节点ID不冲突即可)
canal.instance.mysql.slaveId=99

# 数据库连接配置
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8

# 监听对象(数据库/数据表) 逗号分隔
#canal.instance.filter.regex=test.table1,test.table2
canal.instance.filter.regex=.*\\..*

# 监听黑名单(当需要监听很多表, 只有少量表不需要监听时配置)
canal.instance.filter.black.regex=

运行

  • 启动 - ${canal}/bin/start.sh
  • 停止 - ${canal}/bin/stop.sh
  • 重启 - ${canal}/bin/restart.sh

如果启动报错,如:

ch.qos.logback.core.LogbackException: Unexpected filename extension of file [file:/D:/dev/canal/canal.deployer-1.1.4/conf/]. Should be either .groovy or .xml

修改启动脚本:

@rem set logback_configurationFile=%conf_dir%\logback.xml

去掉 @rem

或者去掉

-Dlogback.configurationFile="%logback_configurationFile%" 

查看日志

server日志 - ${canal}/logs/canal/canal.log

......
start the canal server[10.10.20.112(10.10.20.112):11111]
Start prometheus HTTPServer on port 11112.
 ## the canal server is running now ......

instance日志 - ${canal}/logs/example/example.log

RegisterSlaveCommandPacket[reportHost=127.0.0.1,reportPort=59599,reportUser=canal,reportPasswd=canal,serverId=99,command=21]
position:BinlogDumpCommandPacket[binlogPosition=973,slaveServerId=99,binlogFileName=mysql-binlog.000001,command=18]

完成后就可以用下面的java代码进行测试了

Canal Admin 配置

下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

解压canal.admin-1.1.4.tar.gz

conf/application.yml

默认配置文件

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin

注意:这里使用的是上面新建canal 账号,此时需要赋权

GRANT SELECT, INSERT, UPDATE, DELETE ON `canal_manager`.* TO 'canal'@'%';

FLUSH PRIVILEGES;

导入初始化SQL

mysql -h127.0.0.1 -uroot -p

# 导入初始化SQL
> source conf/canal_manager.sql

初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化

启动

sh bin/startup.sh

查看 admin 日志

vi logs/admin.log

访问

http://127.0.0.1:8089/

默认密码:admin/123456


Canal 与 Canal admin 集成

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

canal-admin的核心模型主要有:

  1. instance,对应canal-server里的instance,一个最小的订阅mysql的队列
  2. server,对应canal-server,一个server里可以包含多个instance
  3. 集群,对应一组canal-server,组合在一起面向高可用HA的运维

修改canal配置文件

修改canal配置文件让其注册到 canal-admin

使用canal_local.properties的配置覆盖canal.properties

# register ip
canal.register.ip =

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =

原本canal-server运行所需要的canal.properties/instance.properties配置文件就需要在web ui上进行统一运维,每个server只需要以最基本的启动配置 (比如知道一下canal-admin的manager地址,以及访问配置的账号、密码即可)

canal-admin 使用

canal-amdin 》 server管理 》 新建Server

  • 所属集群:单机
  • server名称:测试
  • Server IP:192.168.31.111
  • admin端口:11110

注意IP,如果canal启动不了,就需要去看看日志,是不是ip配置错误了
managerAddress:127.0.0.1:8089 can't not found config for [10.10.20.112:11110]

操作 》 配置 可以修改canal server的配置

# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

注释掉canal的用户名密码,不然会报错,不知道为啥
这里默认的用户名密码是 canal/canal

canal-amdin 》 Instance管理 》 新建Instance

  • Instance名称:demo
  • 所属集群/主机:测试

载入模板

修改配置

canal.instance.mysql.slaveId=99

canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex=.*\\..*
canal.instance.filter.black.regex=

这里只改slaveId=99
要注意数据库连接信息

重新启动canal

查看日志

此时可以看到 server 和 instance 都是启动状态

使用java客户端进行测试

单机模式 和 用canal-admin 配置两种方式都用下面的代码测试

新建一个maven项目

导入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

canal server 的用户名密码配置在 canal.properties 文件中,使用canal-amdin时配置在网页中
默认的用户名密码为:canal/canal

package com.wwh.canal.test;

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

public class CanalTest {

    public static void main(String args[]) {
        System.out.println(AddressUtils.getHostIp());

        // 创建链接
        CanalConnector connector = CanalConnectors
                .newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "demo", "", "");
        // destination 配置为 example 或者 demo
        
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容