概述
随着云基础设施的不断完善,云原生已经成为各行业数字化转型的必选项,越来越多的应用开始进行云原生化架构升级和应用迁移。
而云原生实时数仓的出现,让传统的数据仓库无论是成本、灵活性还是开放性等方面都显露出不足。拥有高性能、高可用性、可伸缩性、高安全性等特征的云原生数据库,正在成为企业的首选。
SelectDB Cloud作为一款运行于多云之上的云原生实时数据仓库,可以为客户提供极简运维和极致性价比的数仓服务,为用户提供开箱即用的能力。
同时,SelectDB Cloud 结合 Flink 流式计算,可以让用户将 Kafka 中的非结构化数据以及 MySQL 等上游业务库中的变更数据,实时同步到 SelectDB Cloud中,同时 SelectDB Cloud 提供亚秒级分析查询的能力,可以有效地满足实时 OLAP、实时数据看板以及实时数据服务等场景的需求。
原理
架构设计
在实时计算中,通过Flink可以将业务数据库(MySQL、SQLServer、Oracle)或Kafka消息队列等其他上游数据作为Source读取出来,经过FlinkSQL或着DataStream加工计算,最后将清洗后的数据写入。
Sink写入时使用Flink SelectDB Connector,将数据先上传到internal stage,最后通过copy into的方式将文件一次性加载到表中。
Exactly-Once
在实时写入场景中,如何确保端到端的数据一致性是经常遇到的问题,Flink SelectDB Connector基于Flink的两阶段提交,实现了端到端的Exactly-Once。
以Kafka为例,接收到上游的数据后,会持续的发送到Sink端
Sink端会将数据以文件的形式周期性的写入到SelectDB Cloud的InternalStage。
当Checkpoint完成后,会将刚写入Internal Stage的文件列表一次性通过Copy Into的方式导入到Table中。
Flink任务意外挂掉后,从上个Checkpoint重启,会自动Commit上次未完成的Copy任务。
使用Flink CDC同步MySQL数据到SelectDB Cloud
下载安装Flink
这里以Flink1.15版本为例
wget https://dlcdn.apache.org/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.12.tgz
tar -zxvf flink-1.15.3-bin-scala_2.12.tgz
cd flink-1.15.3
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar -P ./lib/
wget https://selectdb.s3.amazonaws.com/connector/flink-selectdb-connector-1.15-1.0.0-SNAPSHOT.jar -P ./lib/
启动Flink Standalone集群
bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host flink.
Starting taskexecutor daemon on host flink.
在MySQL和SelectDBCloud上创建表,并且在MySQL中预先插入数据
-- 创建MySQL表
CREATE TABLE test.employees (
emp_no INT NOT NULL,
birth_date DATE NOT NULL,
first_name VARCHAR(14) NOT NULL,
last_name VARCHAR(16) NOT NULL,
gender ENUM ('M','F') NOT NULL,
hire_date DATE NOT NULL,
PRIMARY KEY (emp_no)
);
-- 插入数据
INSERT INTO test.employees VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'),
(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'),
(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12');
-- selectdb cloud中创建表
CREATE TABLE test.employees (
emp_no int NOT NULL,
birth_date date,
first_name varchar(20),
last_name varchar(20),
gender char(2),
hire_date date
)
UNIQUE KEY(`emp_no`)
DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1
在Flink SQL Client上提交Flink任务
SET 'execution.checkpointing.interval' = '10s';
CREATE TABLE employees_source (
emp_no INT,
birth_date DATE,
first_name STRING,
last_name STRING,
gender STRING,
hire_date DATE,
PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '',
'database-name' = 'test',
'table-name' = 'employees'
);
CREATE TABLE employees_sink (
emp_no INT ,
birth_date DATE,
first_name STRING,
last_name STRING,
gender STRING,
hire_date DATE
)
WITH (
'connector' = 'selectdb',
'load-url' = '127.0.0.1:36252',
'jdbc-url' = '127.0.0.1:19846',
'cluster-name' = 'cluster1',
'table.identifier' = 'test.employees',
'username' = 'admin',
'password' = '',
'sink.enable-delete' = 'true'
);
INSERT INTO employees_sink select * from employees_source
在MySQL中执行增删改操作
INSERT INTO test.employees VALUES (10006,'1953-04-20','Anneke','Preusig','F','1989-06-02');
UPDATE test.employees set last_name = 'update' where emp_no = 10001;
DELETE FROM test.employees WHERE emp_no = 10002;
在SelectDB Cloud中验证
mysql> select * from employees;
+--------+------------+------------+-------------+--------+------------+
| emp_no | birth_date | first_name | last_name | gender | hire_date |
+--------+------------+------------+-------------+--------+------------+
| 10001 | 1953-09-02 | Georgi | update | M | 1986-06-26 |
| 10003 | 1959-12-03 | Parto | Bamford | M | 1986-08-28 |
| 10004 | 1954-05-01 | Chirstian | Koblick | M | 1986-12-01 |
| 10005 | 1955-01-21 | Kyoichi | Maliniak | M | 1989-09-12 |
| 10006 | 1953-04-20 | Anneke | Preusig | F | 1989-06-02 |