Flink CDC 原理及生产实践

MySQL CDC连接器允许从MySQL数据库读取快照数据和增量数据。
本文档根据ververica官网翻译了如何设置MySQL CDC连接器以对MySQL数据库运行SQL查询。

一、依赖关系

为了设置MySQL CDC连接器,下表提供了使用构建自动化工具(例如Maven或SBT)和带有SQL JAR捆绑包的SQL Client的两个项目的依赖项信息。

1、Maven依赖

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>1.1.0</version>
</dependency>

2、SQL客户端JAR

下载flink-sql-connector-mysql-cdc-1.1.0.jar并将其放在下<FLINK_HOME>/lib/。

二、设置MySQL服务器

您必须定义一个对Debezium MySQL连接器监视的所有数据库具有适当权限的MySQL用户。

1、创建MySQL用户:

mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

2、向用户授予所需的权限:

mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

3、最终确定用户的权限:

mysql> FLUSH PRIVILEGES;

查看有关权限说明的更多信息:https://debezium.io/documentation/reference/1.2/connectors/mysql.html#_permissions_explained

三、注意

1、MySQL CDC源代码如何工作

启动MySQL CDC源时,它将获取一个全局读取锁(FLUSH TABLES WITH READ LOCK),该锁将阻止其他数据库的写入。然后,它读取当前binlog位置以及数据库和表的schema。之后,将释放 全局读取锁。然后,它扫描数据库表并从先前记录的位置读取binlog。Flink将定期执行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复。因此,它保证了仅一次的语义。

2、向MySQL用户授予RELOAD权限

如果未授予MySQL用户RELOAD权限,则MySQL CDC源将改为使用表级锁,并使用此方法执行快照。这会阻止写入更长的时间。

3、全局读取锁(FLUSH TABLES WITH READ LOCK)

全局读取锁 在读取binlog位置和schema期间保持。这可能需要几秒钟,具体取决于表的数量。全局读取锁定会阻止写入,因此它仍然可能影响在线业务。
如果要跳过读取锁,并且可以容忍至少一次语义,则可以添加'debezium.snapshot.locking.mode' = 'none'选项以跳过锁。

4、为每个作业设置一个differnet SERVER ID

每个用于读取binlog的MySQL数据库客户端都应具有唯一的ID,称为server id。MySQL服务器将使用此ID维护网络连接和binlog位置。如果不同的作业共享相同的server id,则可能导致从错误的binlog位置进行读取。
提示:默认情况下,启动TaskManager时,server id是随机的。如果TaskManager失败,则再次启动时,它可能具有不同的server id。但这不应该经常发生(作业异常不会重新启动TaskManager),也不会对MySQL服务器造成太大影响。

因此,建议为每个作业设置不同的server id ,例如:

  • 通过SQL Hints: SELECT * FROM source_table /+ OPTIONS('server-id'='123456') / ;

  • 通过Stream ApI的 创建source时设置: ** MySQLSource.builder().xxxxxx.serverId(123456);**

重点:Mysq的binlog 可以说是针对库级别,所以相同的server id去拉一个库里的不同表或者相同表可能会造成数据丢失。所以建议设置server id。(我也有在社区邮件中提问Jark大佬:传送地址

5、扫描数据库表期间无法执行检查点

在扫描表期间,由于没有可恢复的位置,因此我们无法执行checkpoints。为了不执行检查点,MySQL CDC源将保持检查点等待超时。超时检查点将被识别为失败的检查点,默认情况下,这将触发Flink作业的故障转移。因此,如果数据库表很大,则建议添加以下Flink配置,以避免由于超时检查点而导致故障转移:

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647

6、设置MySQL会话超时

为大型数据库创建初始一致的快照时,在读取表时,您建立的连接可能会超时。您可以通过在MySQL配置文件中配置Interactive_timeout和wait_timeout来防止此行为。

  • interactive_timeout:服务器在关闭交互式连接之前等待活动的秒数。参见MySQL文档

  • wait_timeout:服务器在关闭非交互式连接之前等待其活动的秒数。参见MySQL文档

四、如何创建MySQL CDC表

1、Sql的方式:

(1)定义表如下:

-- register a MySQL table 'orders' in Flink SQL
CREATE TABLE orders (
  order_id INT,
  order_date TIMESTAMP(0),
  customer_name STRING,
  price DECIMAL(10, 5),
  product_id INT,
  order_status BOOLEAN
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'mydb',
  'table-name' = 'orders'
);

-- read snapshot and binlogs from orders table
SELECT * FROM orders;

(2)连接器选项

      待补充。。。

2、Stream API

MySQL CDC连接器也可以是DataStream源。您可以创建SourceFunction,如下所示:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;

public class MySqlBinlogSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
      .hostname("localhost")
      .port(3306)
      .databaseList("inventory") // monitor all tables under inventory database
      .username("flinkuser")
      .password("flinkpw")
      .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
      .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
      .addSource(sourceFunction)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute();
  }
}

五、特征

1、Exactly-Once Processing 一次处理

MySQL CDC连接器是Flink Source连接器,它将首先读取数据库快照,然后即使发生故障,也将以完全一次的处理继续读取二进制日志。请阅读连接器如何执行数据库快照。

2、Single Thread Reading 单线程阅读

MySQL CDC源无法并行读取,因为只有一个任务可以接收Binlog事件。

六、数据类型映射。

待补充。。。

七、常问问题

1、如何跳过快照并仅从binlog中读取?

可以通过选项进行控制debezium.snapshot.mode,您可以将其设置为:

  • never:指定连接永远不要使用快照,并且在第一次使用逻辑服务器名称启动时,连接器应该从binlog的开头读取;请谨慎使用,因为只有在binlog保证包含数据库的整个历史记录时才有效。

  • schema_only:如果自连接器启动以来不需要数据的连续快照,而只需要它们进行更改,则可以使用该schema_only选项,其中连接器仅对模式(而不是数据)进行快照。

2、如何读取包含多个表(例如user_00,user_01,...,user_99)的共享数据库?

该table-name选项支持正则表达式以监视多个与正则表达式匹配的表。因此,您可以设置table-name为user_.*监视所有user_前缀表。database-name选项相同。请注意,共享表应该在相同的架构中。

3、ConnectException:收到用于处理的DML'...',binlog可能包含使用语句或基于混合的复制格式生成的事件

如果有上述异常,请检查是否binlog_format为ROW,您可以通过show variables like '%binlog_format%'在MySQL客户端中运行来进行检查。请注意,即使binlog_format您的数据库配置为ROW,也可以通过其他会话更改此配置,例如SET SESSION binlog_format='MIXED'; SET SESSION tx_isolation='REPEATABLE-READ'; COMMIT;。还请确保没有其他会话正在更改此配置

八、实践中遇到的问题

待补充。。。

1、不同的kafka版本依赖冲突会造成cdc报错:http://apache-flink.147419.n8.nabble.com/cdc-td8357.html#a8393
2、超时问题: 根据上面提到设置wait_timeout解决。

cdc超时bug图.png

3、

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容