基于CDC做全链路数据审计系统-canal改造(四)

我们都知道canal是CDC的一个实现,用来监控db数据变更的。

前言:

在默认的情况下,mysql的binlog是不会记录执行的sql的,即使你设置了binlog_format=row(要开启binlog记录数据变更必须设置该值)情况。所以我们需要开启另一个参数binlog_rows_query_log_events,该值默认情况下是关闭的,我们需要自己手动开启。启动这个参数,则可在row格式下查看到执行的sql语句 。

开启命令 :

set  binlog_rows_query_log_events = 'on';  //开启的是当前session
set global binlog_rows_query_log_events = 'on'; //开启的是全局配置
Note :

开启了该参数后,对于mysql的写入性能会有所影 响,主要是IO这块,毕竟多写了一些数据到磁盘中,但是性能影响不会是很大。

查看binlog事件

开启参数后,执行一些写入操作,我们就可以查看Mysql里的binlog里的事件(因为canal也是基于这些事件来做的)及内容了。
具体操作步骤:
1.获取binlog文件列表 show binary logs;

2.查看指定binlog文件的内容 show binlog events in 'mysql-bin.xxx'; 比如mysql-bin-log.000078,这个值就是从第一步获取到的。

3.查看event的里面的sql(Rows_query 事件):


图片.png

从上图中们可以看到Rows_query这个事件里其实有我们想要的sql了。

mysql事务:

由于Mysql的事件是独立分开的,Rows_query事件,ROWDATA事件(具体的数据变更)。初看我们是没有办法将他们两个进行关联起来的,当时我在做这块的时候其实也懵逼了半天。后来我仔细看了一下他的事件流,我发现其实每次修改都是一个事务。他都会有事务的开启和事务的结束,我们可以在里面做文章。
这个是我对insert,update,delete做的一些测试时获取到的一些数据。

update:
2020-01-19 21:42:50.973 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONBEGIN,sql:null
2020-01-19 21:42:54.118 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :QUERY,entryType:ROWDATA,sql:UPDATE `rocket`.`user` SET `name` = 'ff' WHERE `id` = 8
2020-01-19 21:42:54.623 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:ROWDATA,sql:null
2020-01-19 21:42:54.824 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONEND,sql:null

insert:
2020-01-19 21:43:54.298 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONBEGIN,sql:null
2020-01-19 21:43:55.921 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :QUERY,entryType:ROWDATA,sql:INSERT INTO `rocket`.`test`(`id`, `user_add_rating1`, `user_add_rating`, `name`) VALUES (9, 'sbdaf', 'daf', 'afds')
2020-01-19 21:43:56.167 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONEND,sql:null

delete:
2020-01-19 21:49:29.105 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONBEGIN,sql:null
2020-01-19 21:49:29.652 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :QUERY,entryType:ROWDATA,sql:DELETE FROM `rocket`.`test` WHERE `id` = 10
2020-01-19 21:49:29.795 [pool-4-thread-1] WARN  c.a.otter.canal.store.memory.MemoryEventStoreWithBuffer - eventType :UPDATE,entryType:TRANSACTIONEND,sql:null

canal :

简介

canal是阿里用来做数据同步用的工具,但是官方的版本,只会记录数据的变更前和之后的数据(与maxwell等工具一样),他不会记录这条数据是哪条sql改变的。为什么不记录呢,因为canal的设计是记录数据的变更过程,并不关心他是哪个sql的引起。但是我们看了他们的源码,其实他们也监控了Rows_query 事件的,他们发到mq也是有这个事件的,但是,数据变更事件和Rows_query 事件两个事件是独立的事件,他们之间没有任何关联(mysql本身也没有做关联)。

由于他们之间没能关联,我们拿到这两个事件也没办法做任何处理。所以,我们在想我们能不能通过某种方法将他们关联起来呢。正如前面讲到的mysql事务,他是有开始有结束的事件的(这些事件是有序的),我们可以在收到sql的事件时将该sql存在一个地方,然后再收到ROWDATA事件时从刚才存的sql地方取出来并设置到该事件中(因为query事件先后ROWDATA事件),然后在收到TRANSACTIONBEGIN或者TRANSACTIONEND事件的时候清空sql,这不就ok了么。

按照这个思路,我们公司的一个小伙伴改造了一版,他是在 MemoryEventStoreWithBuffer(后面大体会讲canal流程)里进行处理的(不推荐),后面发现只要数据量一大(像我们公司老项目一个sql 20M...)一下就内存溢出了,canal就挂了,后面我看了一下源码,发现换一种处理的方式可以更好,换在发Mq消息的地方做处理(到目前还没出现内存溢出问题)。

调用关系(以前记录的笔记很简单):

1.解析流程 Parser ----Slink----EventStore
AbstractEventParser (consumeTheEventAndProfilingIfNecessary()方法) —> EntryEventSink —-> MemoryEventStoreWithBuffer

2.Server 调用关系:
CanalServerWithEmbedded(get(ClientIdentity clientIdentity, int batchSize) )--->MemoryEventStoreWithBuffer( tryGet() )

tryGet() :
获取到的是Event,event里面有CanalEntry.Entry entry; 这个entry的toString方法就是ByteString rawEntry 值(因为每次调用toString序列化会化很多时间,所以将他cache起来效率更高);
如果我们开启行级模式并记录了sql的话,要保证Event里的事件类型是EventType.QUERY,
我们可以通过Event里的 CanalEntry.Entry entry,
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());每获取的rowChange,通过rowChange来获取sql:rowChange.getSql(),

3.发送mq消息:
com.alibaba.otter.canal.deployer.CanalStarter#start -->
com.alibaba.otter.canal.server.CanalMQStarter#start -->
com.alibaba.otter.canal.kafka.CanalKafkaProducer#send(com.alibaba.otter.canal.common.MQProperties.CanalDestination, com.alibaba.otter.canal.protocol.Message, com.alibaba.otter.canal.spi.CanalMQProducer.Callback) (或者rocketMq的send,具体的实现类)

从上面的流程并配合上源码,其实可以发现,MemoryEventStoreWithBuffer是一个环型的内存queue,虽然可以在里面做处理,但是如果将每条数据的变更(一次修改导致几万,几十万的数据变更)都加上一个sql。这个内存无疑就会增长很快,一会儿就溢出了,所以不推荐在此处做修改。

改造

由于我们公司没有用rocketmq,kafka作为处理Log这块效率极高,所以我们主要改造kafka的send这块逻辑,rocketMq修改的原理差不多。

该变量放在CanalKafkaProducer中:

    private Map<String,String> sqlMap = new ConcurrentHashMap<>();

我们看一下CanalKafkaProducer里的send方法

 private void send(MQProperties.CanalDestination canalDestination, String topicName, Message message)
            throws Exception {
        if (!kafkaProperties.getFlatMessage()) {
            List<ProducerRecord> records = new ArrayList<ProducerRecord>();
            if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                Message[] messages = MQMessageUtils.messagePartition(message,
                        canalDestination.getPartitionsNum(),
                        canalDestination.getPartitionHash());
                int length = messages.length;
                for (int i = 0; i < length; i++) {
                    Message messagePartition = messages[i];
                    if (messagePartition != null) {
                        records.add(new ProducerRecord<String, Message>(topicName, i, null, messagePartition));
                    }
                }
            } else {
                final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
                records.add(new ProducerRecord<String, Message>(topicName, partition, null, message));
            }

            produce(topicName, records, false);
        } else {
            // 发送扁平数据json

            setSql(canalDestination,message); // 系统默认是扁平数据json,我们只处理这块的逻辑

            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(message);
//            logger.warn("---flatMessages :{}", JSON.toJSONString(flatMessages));

            List<ProducerRecord> records = new ArrayList<>();
            if (flatMessages != null) {
                for (FlatMessage flatMessage : flatMessages) {

                    if("QUERY".equalsIgnoreCase(flatMessage.getType())){ //query event不用发,没用
                        continue;
                    }

                    flatMessage.setSql(sqlMap.get(canalDestination.getCanalDestination()));

                    if (canalDestination.getPartitionHash() != null && !canalDestination.getPartitionHash().isEmpty()) {
                        FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                                canalDestination.getPartitionsNum(),
                                canalDestination.getPartitionHash());
                        int length = partitionFlatMessage.length;
                        for (int i = 0; i < length; i++) {
                            FlatMessage flatMessagePart = partitionFlatMessage[i];
                            if (flatMessagePart != null) {
                                records.add(new ProducerRecord<String, String>(topicName,
                                        i,
                                        null,
                                        JSON.toJSONString(flatMessagePart, SerializerFeature.WriteMapNullValue)));
                            }
                        }
                    } else {
                        final int partition = canalDestination.getPartition() != null ? canalDestination.getPartition() : 0;
                        records.add(new ProducerRecord<String, String>(topicName,
                                partition,
                                null,
                                JSON.toJSONString(flatMessage, SerializerFeature.WriteMapNullValue)));
                    }

                   /* for (ProducerRecord record : records) {
                        logger.warn("-----key:{},mvn clean install -Dmaven.test.skip -Denv=release:{}", record.key(), record.value());
                    }*/

                    // 每条记录需要flush
                    produce(topicName, records, true);
                    records.clear();
                }
                clearSql(canalDestination,message);
            }
        }
    }

获取sql并存储在一个变量中

  /**
     * @param message
     */
    private void setSql(MQProperties.CanalDestination canalDestination,Message message) {
        List<CanalEntry.Entry> entries = message.getEntries();
        if (entries != null) {
            for (CanalEntry.Entry k : entries) {
                boolean isTransActionBegin = k.getEntryType().getNumber()
                        == CanalEntry.EntryType.TRANSACTIONBEGIN_VALUE;

                if (isTransActionBegin) {
                    sqlMap.put(canalDestination.getCanalDestination(),"");
                    continue;
                }

                boolean isRowData = k.getEntryType().getNumber() == CanalEntry.EntryType.ROWDATA_VALUE;
                boolean isQueryEventType =
                        k.getHeader().getEventType().getNumber() == CanalEntry.EventType.QUERY_VALUE;
                if (isRowData && isQueryEventType) { //query event
                    try {
                        String sql;
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(k.getStoreValue());
                        sql = rowChange == null ? null : rowChange.getSql();
                        if(StringUtils.isNotBlank(sql) && sql.length() > 2000){
                            sql = sql.substring(0,2000);
                        }
//                        logger.warn("----sql:{}", sql);
                        sqlMap.put(canalDestination.getCanalDestination(), sql == null ? "" : sql);
                        break;
                    } catch (InvalidProtocolBufferException e) {
                        logger.warn("e,", e);
                    }
                }
            }
        }

    }

清除sql变量里的值

private void clearSql(MQProperties.CanalDestination canalDestination,Message message) {
        List<CanalEntry.Entry> entries = message.getEntries();
        if (entries != null) {
            for (CanalEntry.Entry k : entries) {
                boolean isTransActionEnd = k.getEntryType().getNumber()
                        == CanalEntry.EntryType.TRANSACTIONEND_VALUE;

                if (isTransActionEnd) {
                    sqlMap.put(canalDestination.getCanalDestination(),"");
                    break;
                }
            }
        }
    }

从上面是我们已经修改过后的代码,其实我们主要修改的位置是在
[// 发送扁平数据json] 这块,因为系统默认就是发送的这种数据,其他格式我们不用关心,因为我们收到的数据主要是json形式的。

开始我想的是在for (FlatMessage flatMessage : flatMessages) {xxxxx}里面去做sql.当一条sql修改的数据比较少的情况下没问题,但是一旦修改的数据很多时,这里面是没有sql事件也就是拿不到sql的(canal每次从内存queue里get的数据条数默认是50条)。所以经过仔细阅读源码发现其实我们可以在外层进行处理,还是按事务的开始和结束时做处理。 然后清除sql变量值也是每次我们发送一批的时候都去判断一下要不要做清除处理。

其实canal的整体改造就弄完了,其实不算复杂,但是你要改造,首先还是得对canal的整体设计有所了解,然后具体去找他们的调用逻辑,最后去改造他。

我们来看一下改造后收到的数据格式:

{
    "data": [
        {
            "id": "14",
            "name": "zhangsan",
            "address": "addreds"
        }
    ],
    "database": "test",
    "es": 1611294851000,
    "id": 3,
    "isDdl": false,
    "mysqlType": {
        "id": "bigint(20)",
        "name": "varchar(200)",
        "address": "varchar(255)"
    },
    "old": [
        {
            "name": "bbbbbb"
        }
    ],
    "pkNames": [
        "id"
    ],
    "sql": "/*@123,ff44cb60bd074b659c7c7087712b8f56@*/ UPDATE user  SET name='zhangsan',\naddress='addreds'  WHERE id=14",
    "sqlType": {
        "id": -5,
        "name": 12,
        "address": 12
    },
    "table": "user",
    "ts": 1611294851425,
    "type": "UPDATE"
}

总结一下,要想canal拿到sql你需要做的事:
1.查看binlog开启状态:
show variables like '%log_bin%' ;
如果没开启就需要开启

2.设置mysql: set global binlog_format = "ROW" ; 也可以在配置文件修改(推荐):

[mysqld]
binlog_format=Row

3.开启binlog_rows_query_log_events参数:
set global binlog_rows_query_log_events = 'on';
开启后要查看是否成功时,你修改几条数据,然后通过下面步骤查看:
1>.获取binlog文件列表 show binary logs;
2>.查看指定binlog文件的内容 show binlog events in 'mysql-bin.xxx';

4.定制canal开发,如果自己不想动手可以去我的github直接下载:https://github.com/waterlang/canal-data-sql 如果对你有帮助,可以对项目点一个star,谢谢咯。
canal的具体使用直接参考canal的官方文档。

本节就完了,下一节我们将做入口和db数据变更的解析并存储及整个项目的整合。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,123评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,031评论 2 384
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,723评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,357评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,412评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,760评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,904评论 3 405
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,672评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,118评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,456评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,599评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,264评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,857评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,731评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,956评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,286评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,465评论 2 348

推荐阅读更多精彩内容