Mongo新机制ChangeStream解决数据同步迁移的问题

问题背景

在公司中,不同业务部门之间有时会像一个个孤岛,无法连接起来,各自为战,各自部门之间的工作成果以及数据因为格式问题,完整性问题,很难给到其他部门去复用。当然,正常情况下,应该是有专门的架构组童鞋去做一些数据的整合,各部门业务之间的数据协商统一之类的工作。不过这次面临的问题就是,如何在没有外部资源协助下,如何完整的将X部门存档在mongo并实时写入的数据完整同步到本部门的mongo集群中并使用呢?

问题分析

涉及到数据同步这方面的问题其实主要要考虑的是两个方面,历史数据和实时数据。最朴素的想法其实也很简单,以一个时间节点做为分界点,在这个时间节点截下mongo库的数据快照并dump,这个时间节点后的数据可以通过程序记录游标定时范围查询控制返回。
这是一个比较基础的思路,主要存在以下问题:

  1. 实时性相当差,我们这里的数据的新鲜程度完全依赖于这个“定时”。
  2. 逻辑上比较绕,而且依赖于X部门有对数据做时间戳,那如果下一次要同步的数据没有写时间戳,怎么办呢?
  3. 开发工作比较麻烦,调试上也需要步步谨慎,数据量大情况下又很难看出哪里有问题,只能等待问题来找你,很被动。

解决方案

Mongo3提供了新的特性非常契合我现在的场景,这个新特性的名字叫做ChangeStream。首先我先放上官方文档
MongoDB-changeStreams
然后再开始胡咧咧自己的理解。
这个一个可以在MongoClient(包括Java,python等)去订阅Mongo数据库中某张表最新事件变化的特性,相当于以事件驱动机制对Collection的变化做监听,化主动为被动。是的我没有写错,原先朴素的思路有提到,我们需要主动定时去用时间游标去捞取最新的数据,而现在变成了只需要“被动”等待collection的变化消息过来即可。

举个例子:当我们监控数据库的CollectionA时,每当A被写入一条数据(插入,更新,删除等),我们也能马上收到这条消息,根据消息进行自定义的操作,比如A表的删除数据你可以只做更新字段来表示,灵活度和实时性都非常实用。

实验报告

这里放上对该特性的一些实地测试。
这里是最初的python版测试代码,可以提供参考,但是实际的测试实验是用java-client完成的,逻辑也是类似。

import pymongo

monclient = pymongo.MongoClient("mongodb://user:pass:port/")
db = monclient["col"]

cursor = db['col'].watch()

while True:
    doc = next(cursor)
    print(doc)

然后在监控表中插入一条数据后,可以看到下图:

插入数据.png

这里在程序的运行过程中也说明了一点,监控过程中cursor.hasNext()这个方法是阻塞等待新的消息进来的,可以放心使用

继续做测试,接下来是更新消息:


更新数据.png

很好,基本都可以做到实时响应,上面这两图可以配合下面的官方文档中的Document基本格式食用。

{
   _id : { <BSON Object> },
   "operationType" : "<operation>",
   "fullDocument" : { <document> },
   "ns" : {
      "db" : "<database>",
      "coll" : "<collection"
   },
   "to" : {
      "db" : "<database>",
      "coll" : "<collection"
   },
   "documentKey" : { "_id" : <value> },
   "updateDescription" : {
      "updatedFields" : { <document> },
      "removedFields" : [ "<field>", ... ]
   }
   "clusterTime" : <Timestamp>,
   "txnNumber" : <NumberLong>,
   "lsid" : {
      "id" : <UUID>,
      "uid" : <BinData>
   }
}

在Java的API中可以看到,是有一些变形的,但是不影响使用,还是可以很轻松分辨出来API中对应的字段的。
接下来是一个我比较关心的特性,就是如何进行断点续传
程序的关闭启动是很正常的需求和操作,在程序关闭后如何在再次启动后可以直接延续上次进度而不会丢失这段时间内的数据就是一个非常重要的问题了。在官方文档中提到了resumeToken来解决这一场景。
简单的说,resumeToken是一个记录点,相当于游戏中的存档,我们可以从resumeToken的位置重新开始消费日志。

Use this document as a resumeToken for the resumeAfter parameter when resuming a change stream.

            ChangeStreamDocument<Document> stream = cursor.next();
            resumeToken = stream.getDocumentKey();
            resumeToken = stream.getResumeToken();

可以看到,stream提供了一个获得resumeToken的方法……对JAVA的API中的getResumeToken()获得的resumeToken进行观察,发现有以下特点:

  1. 并不会随着doc的变化而发生变化,而是一直是一个固定值
  2. 也可以从断掉的那一刻获得数据
  3. 时间长了会失效
关闭程序后重启接收到了关闭期间写入的消息

核心代码

        MongoCursor<ChangeStreamDocument<Document>> cursor = null;

        MongoCollection<Document> collection = originMongoDao.getMongoCollection();

        BsonDocument resumeToken = targetMongoDao.getResumeToken();
        if (resumeToken != null){
            cursor = collection.watch().resumeAfter(resumeToken).fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
        }else {
            cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
        }

        long time1 = 0l;
        long time2 = 0l;

        int insert = 0;
        int update = 0;
        int delete = 0;
        int others = 0;


        AtomicInteger count = new AtomicInteger(0);

        while (cursor.hasNext()){
            ChangeStreamDocument<Document> stream = cursor.next();
            resumeToken = stream.getDocumentKey();

            Document doc = stream.getFullDocument();

            MtxxFeature mtxxFeature = new MtxxFeature();
            switch (stream.getOperationType())
            {
                case
                        INSERT:insert += 1;
                        mtxxFeature.setMongo__delete(false);
                break;
                case
                        UPDATE:update += 1;
                        mtxxFeature.setMongo__delete(false);
                break;
                case
                        DELETE:delete += 1;
                        mtxxFeature.setMongo__delete(true);
                break;
                default: others += 1;
                         mtxxFeature.setMongo__delete(false);
                break;
            }

      
            log.info("id:{} end [insert:{} update:{} delete:{} others:{}]", mtxxFeature.getMongo_id(), insert, update, delete, others);


        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,807评论 6 518
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 95,284评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 169,589评论 0 363
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 60,188评论 1 300
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 69,185评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,785评论 1 314
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,220评论 3 423
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,167评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,698评论 1 320
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,767评论 3 343
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,912评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,572评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,254评论 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,746评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,859评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,359评论 3 379
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,922评论 2 361

推荐阅读更多精彩内容