win10 安装使用canal1.1.5

win10 安装使用canal1.1.5

环境 :win10,mysql5.7,canal1.1.5最新

1. 修改mysql配置,位置(D:\soft\mysql-5.7.30-winx64\my.ini)

[mysqld] 服务端配置下面添加三行(其他配置勿动):

server_id=1

log-bin=master   

binlog_format=row

注意:mysql实例id,不能和canal的slaveId重复

或者直接运行如下指令:

mysql> set global binlog_format=ROW;

mysql> set global binlog_row_image=FULL;

配置完后重启mysql 服务:net stop mysql   net start mysql

canal需要建库、建用户和赋权:

CREATE DATABASE `canal_manager`;

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';

-- 注意权限问题,否则在后续的一些操作中会提示没有权限

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

2. 下载安装canal服务端

2.1. 解压第一步下载的canal.deployer-1.1.5-SNAPSHOT.tar.gz

https://github.com/alibaba/canal/releases/

安装canal,单机只需安装一个服务(canal.deployer):

2.2. 修改conf/canal.properties配置文件

注册地址为本机IP(canal.register.ip =本机IP),还要配置输出到tcp(java接口作为一个客户端可收到信息)或kafka

canal.port = 11111 #对外提供的socket接口。如端口不冲突,使用默认即可

canal.serverMode = tcp# tcp对应接口;选择kafka的推送模式,发送kafka消息

另外配置目的地,样例是 example,conf下有这个文件夹。

canal.destinations = example

2.3. 修改conf/example/instance.properties配置文件

#配置数据库信息

canal.instance.master.address=127.0.0.1:3306

#show master status 可查看日志文件及其位置,对应这个参数canal.instance.master.journal.name=master.000004

    # username/password

    canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

2.4. 启动canal服务端

 ./bin/startup.bat

3.    启动java客户端接受信息

   一般用kafka接受信息,这里使用java springboot接收信息,启动主方法即可,修改数据库信息可以看到消息。

  package org.demo.springbootshirocasdemo.component;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry;

import com.alibaba.otter.canal.protocol.Message;

import lombok.SneakyThrows;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

import java.util.List;

public class CanalClient {

    private static final  int BATCH_SIZE = 1;

    private static final String HOST_NAME ="127.0.0.1";

    private static final int PORT =11111;

    private static final String DESTINATION ="example";

    private static final String USERNAME = "root";

    private static final String PASSWORD = "root";

    public static void main(String[] args) {

        try {

            startCanal();

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

    public static void startCanal() throws Exception {

        CanalConnector connector = CanalConnectors.newSingleConnector(

                new InetSocketAddress(HOST_NAME,PORT), DESTINATION, USERNAME, PASSWORD);

        try {

            connector.connect();

            //订阅数据库表,全部表

            connector.subscribe(".*\\..*");

            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿

            connector.rollback();

            while (true) {

                // 获取指定数量的数据

                Message message = connector.getWithoutAck(BATCH_SIZE);

                //获取批量ID

                long batchId = message.getId();

                //获取批量的数量

                int size = message.getEntries().size();

                //如果没有数据

                if (batchId == -1 || size == 0) {

                    try {

                        //线程休眠2秒

                        Thread.sleep(2000);

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    }

                } else {

                    //如果有数据,处理数据

                    printEntry(message.getEntries());

                }

                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。

                connector.ack(batchId);

            }

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            connector.disconnect();

        }

    }

    /**

     * 打印canal server解析binlog获得的实体类信息

     */

    private static void printEntry(List<CanalEntry.Entry> entrys) {

        for (CanalEntry.Entry entry : entrys) {

            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {

                //开启/关闭事务的实体类型,跳过

                continue;

            }

            //RowChange对象,包含了一行数据变化的所有特征

            //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等

            CanalEntry.RowChange rowChage;

            try {

                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

            } catch (Exception e) {

                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);

            }

            //获取操作类型:insert/update/delete类型

            CanalEntry.EventType eventType = rowChage.getEventType();

            //打印Header信息

            System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",

                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),

                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),

                    eventType));

            //判断是否是DDL语句

            if (rowChage.getIsDdl()) {

                System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());

            }

            //获取RowChange对象里的每一行数据,打印出来

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {

                //如果是删除语句

                if (eventType == CanalEntry.EventType.DELETE) {

                    printColumn(rowData.getBeforeColumnsList());

                    //如果是新增语句

                } else if (eventType == CanalEntry.EventType.INSERT) {

                    printColumn(rowData.getAfterColumnsList());

                    //如果是更新的语句

                } else {

                    //变更前的数据

                    System.out.println("------->; before");

                    printColumn(rowData.getBeforeColumnsList());

                    //变更后的数据

                    System.out.println("------->; after");

                    printColumn(rowData.getAfterColumnsList());

                }

            }

        }

    }

    private static void printColumn(List<CanalEntry.Column> columns) {

        for (CanalEntry.Column column : columns) {

            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());

        }

    }

}

4. 可能收到的错误

乱码

vim ../conf/canal.properties 

#传输格式:json

canal.mq.flatMessage = true

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 225,226评论 6 524
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 96,509评论 3 405
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 172,523评论 0 370
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,181评论 1 302
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,189评论 6 401
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,642评论 1 316
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,993评论 3 431
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,977评论 0 280
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,527评论 1 326
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,547评论 3 347
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,661评论 1 355
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,250评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,991评论 3 340
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,422评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,571评论 1 277
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,241评论 3 382
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,737评论 2 366

推荐阅读更多精彩内容