1.场景是 通过flink sql 对表的操作,通过添加多个算子 对结果的操作如图:
例子不是很恰当主要是 我们在创建数仓表时,通过先将一个函数的结果集放到一个视图中, 在通过一个一个函数将结果放入到下一个视图中, 然后在通过视图View2 的结果放入到 数仓新表中。
比如 对一张表中的一列进行函数的结果放入到一个视图中 在对视图进行一个函数 结果集放入到视图中,最终将视图中的结果放入到表中。
2. 版本
mysql | flink |
---|---|
5.7.20-log | fink 14.5 |
3.先创建mysql 表
CREATE DATABASE ;
USE `test`;
DROP TABLE IF EXISTS `Flink_cdc`;
CREATE TABLE `Flink_cdc` (
`id` bigint(64) NOT NULL AUTO_INCREMENT,
`name` varchar(64) DEFAULT NULL,
`age` int(20) DEFAULT NULL,
`birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=72 DEFAULT CHARSET=utf8mb4;
insert into `Flink_cdc`(`id`,`name`,`age`,`birthday`,`ts`) values
(69,'flink',21,'2022-12-09 22:57:15','2022-12-09 22:57:17'),
(70,'flink sql',22,'2022-12-09 23:01:43','2022-12-09 23:01:46'),
(71,'flk sql',23,'2022-12-09 23:43:04','2022-12-09 23:43:07');
3.创建flink mysql cdc 表
CREATE TABLE source_mysql (
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
age INT,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.180',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'server-time-zone' = 'Asia/Shanghai',
'debezium.snapshot.mode' = 'initial',
'database-name' = 'test',
'table-name' = 'Flink_cdc'
);
4.创建视图
Flink SQL> create view v1 as select * from source_mysql;
[INFO] Execute statement succeed.
5.创建算子
先说下这个函数的意思
INSTR(string1, string2) --返回string2在string1 中第一次出现的位置
--select INSTR('flinksql','k'); -- 返回5
create view v2 as select id,instr(name,'k'),age,birthday from v1;
6.将结果写入到新表中 (比如创建表V3)
create view v3 as select * from v2 where name in (select min(name) from v2);
7. 也可以往视图中添加数据
create view v6 as values ('1','flink','hbase','hive');