AWS 搭建 Debezium 抽取 RDS 数据到 Kafka

debezium 实时同步 kafka

本文档介绍了如何从 mysql 和 oracle 中实时抽取数据到 kafka。

前期准备

  1. 2台 linux 服务器用于部署 debezium 高可用。

  2. java 11环境

  3. 搭建 msk 集群(3可用区,3节点),记录 msk 私有终端节点(b-1.xxxx)和 zookeeper(z-1.xxxx) 连接地址

  4. mysql 和 oracle 开启 binlog,format 为 row,且 binglog 保留日期不低于2天,如果 binlog 保留时间很短,debezium 中断重启后会找不到断点记录的 binlog 文件而报错

    rds mysql 设置 binlog 保留时长,连接到数据库,执行以下 sql 语句

    CALL mysql.rds_set_configuration('binlog retention hours', 168); # 设置 binlog 保存时长为168个小时
    
  5. sqlserver 要开启 cdc 才能进行同步,且只有企业版 sqlserver 才能开启 cdc

    使用 RDS 提供的函数启用 SQL Server 的 CDC(Change Data Capture,数据变更捕捉)功能

    EXEC msdb.dbo.rds_cdc_enable_db <数据库名称>
    

    再启用同步表的 CDC 功能,每一张同步的表都需要此操作

    EXEC sys.sp_cdc_enable_table @source_schema = N'<schema 名称>', @source_name = N'<需要同步的表名>', @role_name = NULL
    

    此时,我们可以用下列命令来查看 CDC 情况

    EXEC sys.sp_cdc_help_change_data_capture
    
  6. 进入 msk 控制台,创建参数组,并且修改 auto.create.topics.enable=true,记录下 default.replication.factors 的值

  7. 修改 msk 集群的参数组为新创建的参数组,等待其自动更新配置完毕

搭建 debezium 环境

  1. 登录到用于部署 debezium 的服务器上,2台都需要搭建

  2. 下载 kafka 安装包:kafka download 复制对应版本的安装包的下载链接

    sudo wget https://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz
    sudo tar -zxvf kafka_2.12-2.5.1.tgz
    sudo mv kafka_2.12-2.5.1 /opt/kafka
    
  3. 下载 debezium connector:debezium connector 复制 mysql, sqlserver, oracle connector 的下载链接

    sudo mkdir -p /opt/kafka-connectors
    cd /opt/kafka-connectors
    sudo wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.2.Final/debezium-connector-mysql-1.9.2.Final-plugin.tar.gz
    sudo wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/1.9.2.Final/debezium-connector-oracle-1.9.2.Final-plugin.tar.gz
    sudo wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.9.2.Final/debezium-connector-sqlserver-1.9.2.Final-plugin.tar.gz
    
    sudo tar -zxvf debezium-connector-mysql-1.9.2.Final-plugin.tar.gz
    sudo tar -zxvf debezium-connector-oracle-1.9.2.Final-plugin.tar.gz
    sudo tar -zxvf debezium-connector-sqlserver-1.9.2.Final-plugin.tar.gz
    
  4. 下载 ojdbc8.jar 到 debezium-connector-oracle 中

    cd debezium-connector-oracle
    sudo wget https://download.oracle.com/otn-pub/otn_software/jdbc/215/ojdbc8.jar
    ll oj* # 查看确保ojdbc8.jar 存在
    
  5. 修改 kafka 配置

    cd /opt/kafka
    sudo vim config/connect-distributed.properties
    # 修改
    bootstrap.servers=b-1.xxxxx:9092,b-2.xxxxxx:9092,b-3.xxxxxx:9092
    group.id=debezium1 # 相同 group.id 会组成一个 debezium 集群
    config.storage.replication.factor=<msk 中 default.replication.factors 的值>
    status.storage.replication.factor=<msk 中 default.replication.factors 的值>
    offsets.storage.replication.factor=<msk 中 default.replication.factors 的值>
    
    # 新增
    plugin.path=/opt/kafka-connectors
    connector.client.config.override.policy=All
    # 保存退出
    
  6. 创建 /usr/lib/systemd/system/kafka-connector.service 文件,并填入以下内容

    [Unit]
    Description=Kafka Connector
    After=network.target
    
    [Service]
    User=root
    Type=simple
    WorkingDirectory=/opt/kafka
    Environment="LOG_DIR=/var/log/opt/kafka"
    ExecStart=/opt/kafka/bin/connect-distributed.sh /opt/kafka/config/connect-distributed.properties
    RestartSec=1s
    Restart=on-failure
    
    [Install]
    WantedBy=multi-user.target
    
  7. 启动服务

    sudo systemctl daemon-reload
    sudo systemctl enable kafka-connector.service
    sudo systemctl start kafka-connector.service
    
  8. 查看任务状态

    sudo systemctl status kafka-connector.service
    tail -f /var/log/opt/kafka/connect.log
    

创建同步任务

  1. 创建配置文件,只需要在其中一台服务器上操作即可。

    sudo mkdir -p /opt/kafka/connector
    sudo vim /opt/kafka/connector/lims-cdc.json
    
  2. 修改配置文件内容

    {
        "name": "lims-cdc", 
        "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
            "database.hostname": "xxxxxx.rds.cn-northwest-1.amazonaws.com.cn", 
            "database.port": "3306", 
            "database.user": "xxxxxx", 
            "database.password": "xxxxxx", 
            "database.server.id": "54321", 
            "database.server.name": "xxxx", 
            "table.include.list": "lims1_dev.t_lims_order,lims1_dev.t_lims_folder", 
             "snapshot.mode": "initial", 
             "tombstones.on.delete": "false",
            "database.history.kafka.bootstrap.servers": "b-1.xxxx:9092,b-3.xxxx:9092,b-2.xxxx:9092", 
            "database.history.kafka.topic": "dbhistory", 
            "tombstones.on.delete": "false",
            "transforms": "Reroute", 
            "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", 
            "transforms.Reroute.topic.regex": "(.*)", 
            "transforms.Reroute.topic.replacement": "collect_data_lims", 
            "transforms.Reroute.key.field.name": "shard_id", 
            "include.schema.changes": "false", 
            "key.converter.schemas.enable": "false", 
            "value.converter.schemas.enable": "false", 
            "key.converter": "org.apache.kafka.connect.json.JsonConverter", 
            "value.converter": "org.apache.kafka.connect.json.JsonConverter"
        }
    }
    

    部分参数说明:

    • name:本 connector 的 name,可自定义
    • database.hostname:数据库 host
    • database.server.name: 影响 kafka 中生成 topic 的前缀,可以和 name 一致
    • table.include.list: 需要同步的表的名称:<database-name>.<table-name>,需带数据库名称,可以多个,以英文逗号分隔
    • database.history.kafka.bootstrap.servers:msk 私有终端节点名称
    • database.history.kafka.topic:history 的 topic 的名称,修改成 dbhistory-<connector 的 name>
    • transforms.Reroute.topic.replacement:该任务对应的 msk 中的 topic 的名称,可自定义,不填则默认每张表一个topic

    其余参数保持默认即可

    如果需要在快照时指定条件,在 config 列表中加入以下两个参数,修改值为你的表名和 sql 语句。

    "snapshot.select.statement.overrides": "customer.orders",
    "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC",
    
  3. 启动任务 (官方接口文档)

    curl -i -X POST -H Accept:application/json -H Content-Type:application/json localhost:8083/connectors/ -d @<配置文件.json>
    
  4. 查看任务状态

    curl -X GET -H Accept:application/json localhost:8083/connectors/<你的 connector 的 name>/status | jq
    
  5. 暂停任务

    curl -i -X PUT -H Accept:application/json localhost:8083/connectors/<你的 connector 的 name>/pause
    
  6. 恢复任务

    curl -i -X PUT -H Accept:application/json localhost:8083/connectors/<你的 connector 的 name>/resume
    
  7. 删除任务

    curl -X DELETE -H Accept:application/json localhost:8083/connectors/<你的 connector 的 name>
    
  8. 在线更新配置

    获取当前 connector 的配置

    curl -X GET -H "Accept:application/json" localhost:8083/connectors/<你的 connector 的 name>/config | jq
    

    复制结果,保存到 /opt/kafka/connector/<你的 connector 的 name>-config-当前日期.json 文件中,根据需求修改对应参数。

    增加同步表,只需修改 table.include.list

    {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
            "database.hostname": "xxxxxx.rds.cn-northwest-1.amazonaws.com.cn", 
            "database.port": "3306", 
            "database.user": "xxxxxx", 
            "database.password": "xxxxxx", 
            "database.server.id": "54321", 
            "database.server.name": "xxxx", 
            "table.include.list": "lims1_dev.t_lims_order,lims1_dev.t_lims_folder", 
             "snapshot.mode": "initial", 
             "tombstones.on.delete": "false", 
            "database.history.kafka.bootstrap.servers": "b-1.xxxx:9092,b-3.xxxx:9092,b-2.xxxx:9092", 
            "database.history.kafka.topic": "dbhistory",
         "tombstones.on.delete": "false",
            "transforms": "Reroute", 
            "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", 
            "transforms.Reroute.topic.regex": "(.*)", 
            "transforms.Reroute.topic.replacement": "collect_data_lims", 
            "transforms.Reroute.key.field.name": "shard_id", 
            "include.schema.changes": "false", 
            "key.converter.schemas.enable": "false", 
            "value.converter.schemas.enable": "false", 
            "key.converter": "org.apache.kafka.connect.json.JsonConverter", 
            "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
    
    curl -i -X PUT -H Accept:application/json -H Content-Type:application/json localhost:8083/connectors/<你的 connector 的 name>/config -d @<修改后的json名称>
    
  9. oracle 配置有区别外,其余操作相同。

    decimal.handling.mode:默认参数值是 precise,用 java 的 java.math.BigDecimal 来表示数字,可根据需求修改为 double/string

    {
        "name": "srm-cdc", 
        "config": {
            "connector.class": "io.debezium.connector.oracle.OracleConnector", 
            "tasks.max": "1", 
            "database.hostname": "xxxxxx.rds.cn-north-1.amazonaws.com.cn", 
            "database.port": "1521", 
            "database.user": "xxxx", 
            "database.password": "xxxx", 
            "database.dbname": "SRMDEV", 
            "database.server.name": "srm",   
            "decimal.handling.mode": "string",
            "table.include.list":"SRMTEST.base_supplier,SRMTEST.base_supplier_manufacturer", 
             "database.history.kafka.bootstrap.servers": "b-1.xxxx:9092,b-3.xxxx:9092,b-2.xxxx:9092", 
            "database.history.kafka.topic": "dbhistory-oracle", 
            "tombstones.on.delete": "false",
            "transforms": "Reroute", 
            "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", 
            "transforms.Reroute.topic.regex": "(.*)", 
            "transforms.Reroute.topic.replacement": "collect_data_srm", 
            "transforms.Reroute.key.field.name": "shard_id", 
            "include.schema.changes": "false", 
            "key.converter.schemas.enable": "false", 
            "value.converter.schemas.enable": "false", 
            "key.converter": "org.apache.kafka.connect.json.JsonConverter", 
            "value.converter": "org.apache.kafka.connect.json.JsonConverter"
        }
    }
    
    1. sqlserver 配置有区别外,其余操作相同。
    {
      "name": "sqlserver-cdc",
      "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "database.hostname": "xxxxxxx",
        "database.port": "1433",
        "database.user": "xxxx",
        "database.password": "xxxx",
        "database.server.id": "54321",
        "database.server.name": "mssql",
        "database.dbname": "xxxxx",
        "schema.include.list": "xxxxx",
        "table.include.list": "xxx.xxxx",
        "database.history.kafka.bootstrap.servers": "b-1.xxxx:9092,b-2.xxxx:9092",
        "database.history.kafka.topic": "dbhistory-mssql",
        "tombstones.on.delete": "false",
        "transforms": "Reroute", 
        "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", 
        "transforms.Reroute.topic.regex": "(.*)", 
        "transforms.Reroute.topic.replacement": "collect_data_mssql", 
        "transforms.Reroute.key.field.name": "shard_id", 
        "include.schema.changes": "false",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
      }
    }
    
    
    1. kafka 同步到 mongodb

    在 kafka-connectors 中下载 mongo connector

    cd /opt/kafka-connectors
    sudo wget https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.7.0/mongo-kafka-connect-1.7.0-all.jar
    

    在 /opt/kafka/connector 中新增 mongo-sink.json(可自定义) 配置文件,修改内容:

    {
        "name": "mongo-sink", 
        "config": {
            "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", 
            "connection.uri": "mongodb://<username>:<password>@<host>:<port>/?retryWrites=false", 
            "database": "xxxx", 
            "collection": "xxxx", 
            "topics": "mysql.test_flink_cdc.table1", 
             "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler",
        "key.converter.schemas.enable": "false",
         "value.converter.schemas.enable": "false",
         "key.converter": "org.apache.kafka.connect.json.JsonConverter",
         "value.converter": "org.apache.kafka.connect.json.JsonConverter"
        }
    }
    

    部分参数说明:

    • connection.uri:mongo 连接的 uri,根据实际情况填写

    • change.data.capture.handler:根据 topic 中来源的数据库类型,选择对一个的 handler

      Change Data Capture Handlers — MongoDB Kafka Connector

      • com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler
      • com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.postgres.PostgresHandler
      • com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler

    使用第3步中的 curl 请求,并指定 mongo-sink.json 启动 connector

查看同步信息

  1. 查看 kafka topic

    # 定义环境变量
    zk_addr=z-1.xxxx:2181,z-3.xxxx:2181,z-2.xxxx:2181
    bs_addr=b-1.xxxx:9092,b-2.xxxx:9092
    
    bin/kafka-topics.sh --list --zookeeper $zk_addr
    

    topic 中应有以下topic:

    • connect-configs
    • connect-offsets
    • connect-status
    • 你定义的 Reroute.topic
    • 你定义的 history.kafka.topic
  2. 查看你的 Reroute.topic,如果数据量很大,删除 --from-beginning 参数

    bin/kafka-console-consumer.sh --bootstrap-server $bs_addr --from-beginning --topic <你定义的Reroute.topic>
    

    示例数据:

    {
        "before":{
            "id":4,
            "class":"4",
            "teacher":"dddd"
        },
        "after":{
            "id":4,
            "class":"4",
            "teacher":"eeee"
        },
        "source":{
            "version":"1.9.2.Final",
            "connector":"mysql",
            "name":"mysql",
            "ts_ms":1652342716000,
            "snapshot":"false",
            "db":"test_flink_cdc",
            "sequence":null,
            "table":"table2",
            "server_id":488905673,
            "gtid":null,
            "file":"mysql-bin-changelog.197425",
            "pos":605,
            "row":0,
            "thread":0,
            "query":null
        },
        "op":"u",
        "ts_ms":1652342716967,
        "transaction":null
    }
    

    字段说明:

    • before:原数据,新增则是 null
    • after:修改后数据,删除则是 null
    • source:配置信息:
      • file: binlog 文件
      • table: 表名
      • db: 数据库名称
    • op: 操作类型
      • r: 第一次 snapshot 时获取到的行
      • u:修改
      • c:新增
      • d: 删除
    • ts_ms:时间戳
  3. 修改源端数据库,kafka topic 可以实时刷新出 message

  1. 时间戳 +8 小时
    如果你的数据库里是设置了时区的,debezium 会把date类型的数据转成timestamp,但是它不会拿数据库里的时区设置,而是直接把这个字符串按照utc时间转时间戳,所以在kafka中看到的时间戳比实际的时间就多了8个小时。解决方法就是自己写转换器,打成 jar 包并放到对应的mysql/oracle connector 的目录下,然后在 json 里配置这个 convert。
    github 上有一些现成的转换器,但都有些奇奇怪怪的问题,比如cdc的时间是对的,但是快照的时间错误,date 类型是对的,timestamp类型是错的等。我这有改好的 jar 包,懒得找地方上传了,反正也就我自己看。
  2. oracle redo log设置的空间太小,导致 oracle 归档日志死循环,迅速把磁盘空间撑爆。
    debeizum 做 cdc的时候,会产生大量的 redo log,超过 oracle 设置的 redo log 空间时,oracle 就会把多的日志落盘,这个行为又会产生日志,就一直死循环,每小时10G左右的速度消耗磁盘空间,直到磁盘占满,系统挂掉。解决方法就是把 oracle 的 redo log 空间调大。
  3. debezium 锁表
    debezium 打快照的时候默认会锁表,如果快照的数据量大的话,锁几个小时都是可能的,可能会导致其他应用的连接断开,可以通过设置 snapshot.locking.mode:none 参数,让它不锁表。建议生产环境还是在数据库停机的状态下执行快照行为。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容