1. 动态表
SQL | 流计算 |
---|---|
表是有界数据集,计算过程中,一旦确认了就不会改变 | 流是无界数据集,数据持续产生,计算过程中,数据集持续在变化 |
对批处理数据执行的查询结果可以访问完整的输入数据 | 流式查询在启动时无法访问所有数据,必须等待数据流入 |
对有界数据集的计算最终会结束,得到计算结果 | 流式会持续收到数据,不断更新其计算结果,不会结束 |
传统SQL比较成熟,且应用广泛,如果能够在流上使用SQL进行开发,那么会带来极大的便利,将SQL应用于流计算,需要将流和表两个差异比较大的概念进行融合.如果把有界数据集当成表,那么无界数据集就是一个随着时间变化持续写入数据的表.
2. ⼀个 SQL\Table API 任务的代码结构
public class tableDemo1 {
public static void main(String[] args) {
EnvironmentSettings setting = EnvironmentSettings.newInstance()
.inStreamingMode() //声明成流任务
// .inBatchMode() // 声明成批任务
.build();
TableEnvironment tableEnv = TableEnvironment.create(setting);
//创建一个输入表
tableEnv.executeSql("CREATE TABLE input_table(user_name STRING ,url STRING, ts BIGINT) WITH ( 'connector' = 'filesystem', 'path' = 'src/main/resources/c.txt', 'format' = 'csv' )");
////创建一个输出表
tableEnv.executeSql("CREATE TABLE output_table(user_name STRING ,url STRING) WITH ( 'connector' = 'filesystem', 'path' = 'output/2022-06-21', 'format' = 'csv' )");
//使用TABLE-API
Table table_API = tableEnv.from("input_table").where($("user_name").isEqual("小米")).select($("user_name"), $("url"));
//使用SQL
Table table_SQL = tableEnv.sqlQuery("SELECT user_name, url FROM input_table WHERE user_name = '小米' ");
//将数据写入输出表中
table_API.executeInsert("output_table");
table_SQL.executeInsert("output_table");
}
}
SQL上下文:TableEnvironment
- 在内部catalog中注册表,catalog可以理解为flink的metastore,类似Hive中的metastore对Hive的地位
- 注册外部catalog
- 执行SQL查询
- 注册用户定义(标量,表或聚合)函数
- 将DataStream或DataSet(废弃)转换为表
- 持有对ExecutionEnvironment或StreamExecutionEnvironment的引用
SQL中的表概念
一个表的全名(标识)会由三个部分组成:catalog 名称.数据库名称.表名称,如果catalog名称或者数据库名称没有指名,就会使用当前默认值default,下面这个SQL创建的table的全名就是default.nono.output_table
tableEnv.executeSql("CREATE TABLE nono.output_table(user_name STRING ,url STRING) WITH ( 'connector' = 'filesystem', 'path' = 'output/2022-06-21', 'format' = 'csv' )");
表可以是常规的外部表,也可以是虚拟的视图
- 外部表:描述的是外部数据,例如是文件,消息队列等
- 视图view:从已经存在的表中创建,视图一般是一个SQL逻辑的查询结果,对比到离线的hive SQL 中,在离线的场景中的view也都是从已经有的表中去创建
一个SQL查询案例
聊聊flink 1.11 中的随机数据生成器-DataGen connector
public class tableDemo2 {
public static void main(String[] args) {
EnvironmentSettings setting = EnvironmentSettings.newInstance()
.inStreamingMode() //声明成流任务
// .inBatchMode() // 声明成批任务
.build();
TableEnvironment tableEnv = TableEnvironment.create(setting);
//创建随机mock数据源
tableEnv.executeSql("CREATE TABLE sku_price_table( tag STRING ,sku_id INT ,price INT,ts AS localtimestamp, WATERMARK FOR ts AS ts ) " +
" WITH( 'connector' = 'datagen', " +
" 'rows-per-second'='1', " +
" 'fields.sku_id.kind'='sequence', " +
" 'fields.sku_id.start'='1'," +
" 'fields.sku_id.end'='1000'," +
" 'fields.tag.length'='1'," +
" 'fields.price.min'='1'," +
" 'fields.price.max'='1000')");
//创建到kafka的输入表
tableEnv.executeSql("CREATE TABLE sink_tag_agg_Table (" +
" tag STRING, " +
" count_tag BIGINT, " +
" sum_tag BIGINT , " +
" max_tag BIGINT, " +
" PRIMARY KEY (`tag`) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'upsert-kafka', " +
" 'topic' = 'topic05', " +
" 'properties.bootstrap.servers' = '43.142.80.86:9093', " +
" 'key.format' = 'json' ," +
" 'value.format' = 'json'" +
")");
tableEnv.executeSql("INSERT INTO sink_tag_agg_Table SELECT tag, count(1) as count_tag, sum(price) as sum_tag,max(price) as max_tag FROM sku_price_table GROUP BY tag");
}
}
bug:Could not find any factory for identifier 'json' that implements
缺少依赖,加入json依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
SQL 与 DataStream API 的转换
SQL实现不了的功能DataStream API来实现喽,如下实现一个log告警功能
public class tableDemo3 {
public static void main(String[] args) throws Exception {
LocalStreamEnvironment env= StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//创建随机mock数据源
tableEnv.executeSql("CREATE TABLE tag_money_table( tag STRING ,id INT ,money INT,ts AS CAST(CURRENT_TIMESTAMP AS timestamp_LTZ(3)), WATERMARK FOR ts AS ts - INTERVAL '1' SECOND) " +
" WITH( 'connector' = 'datagen', " +
" 'rows-per-second'='1', " +
" 'fields.id.kind'='sequence', " +
" 'fields.id.start'='1'," +
" 'fields.id.end'='1000'," +
" 'fields.tag.length'='1'," +
" 'fields.money.min'='100'," +
" 'fields.money.max'='10000')");
//使用窗口TVF实现累计窗口
Table sqlQuery = tableEnv.sqlQuery("SELECT UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 ,window_end , COUNT(DISTINCT id ) ,SUM(money) as money FROM TABLE(CUMULATE(TABLE tag_money_table ,DESCRIPTOR(ts) ,INTERVAL '5' SECOND ,INTERVAL '30' SECOND )) GROUP BY window_start ,window_end ");
//创建Logger对象
Logger logger = LoggerFactory.getLogger(tableDemo3.class);
//将表转换成流
tableEnv.toDataStream(sqlQuery,Row.class).flatMap(new FlatMapFunction<Row, String>() {
@Override
public void flatMap(Row value, Collector<String> out) throws Exception {
long l = Long.parseLong(String.valueOf(value.getField("money")));
if ( l >= 10000L){
logger.info("......告警.....");
logger.info("-------");
}
out.collect(value.toString());
}
}).print();
env.execute();
}
}
要配置log4j.properties,把这个文件夹放在src/main/resources下
### 设置###
log4j.rootLogger = debug,stdout,D,E
### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
### 输出DEBUG 级别以上的日志到=/home/hao/Desktop/error.log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = /home/hao/Desktop/error.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
### 输出ERROR 级别以上的日志到=/home/hao/Desktop/error.log ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File =/home/hao/Desktop/error.log
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR
log4j.appender.E.layout = org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
日期.时间类型
- DATE:由年-月-日 组成的不带时区含义的日期函数类型,取值范围[0000-01-01 , 9999-12-31]
- TIME,TIME(p):不带时区的时间数据类型,由 hour:minute:second[.fractional] 组成,精度达到纳秒,范围从 00:00:00.000000000 到 23:59:59.999999999。与 SQL 标准相比,不支持 leap seconds(23:59:60 和 23:59:61),语义上更接近于 java.time.LocalTime。没有提供带有时区的时间。其中 p 是秒的小数部分的位数(精度)。p 的值必须介于 0 和 9 之间(含边界值)。如果未指定精度,则 p 等于 0
- TIMESTAMP,TIMESTAMP(p):不带时区的时间戳数据类型,由 year-month-day hour:minute:second[.fractional] 组成,精度达到纳秒,范围从 0000-01-01 00:00:00.000000000 到 9999-12-31 23:59:59.999999999。与 SQL 标准相比,不支持 leap seconds(23:59:60 和 23:59:61),语义上更接近于 java.time.LocalDateTime其中 p 是秒的小数部分的位数(精度)。p 的值必须介于 0 和 9 之间(含边界值)。如果未指定精度,则 p 等于 6。TIMESTAMP(p) WITHOUT TIME ZONE 等价于此类型
- TIMESTAMP WITH TIME ZONE,TIMESTAMP(p) WITH TIME ZONE:带有时区的时间戳数据类型,由 year-month-day hour:minute:second[.fractional] zone 组成,精度达到纳秒,范围从 0000-01-01 00:00:00.000000000 +14:59 到 9999-12-31 23:59:59.999999999 -14:59 其中 p 是秒的小数部分的位数(精度)。p 的值必须介于 0 和 9 之间(含边界值)。如果未指定精度,则 p 等于 6
- TIMESTAMP_LTZ、TIMESTAMP_LTZ(p):由 年-月-日 ⼩时:分钟:秒[.⼩数秒] 时区 组成的 带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中p代表小数秒的位数,取值范围是[0,9],如果不指定p,默认为6
- INTERVAL YEAR TO MONTH:一组由 Year-Month Interval 组成的数据类型,其范围从 -9999-11 到 +9999-11,可以表达:
INTERVAL YEAR
INTERVAL YEAR(p)
INTERVAL YEAR(p) TO MONTH
INTERVAL MONTH
其中 p 是年数(年精度)的位数。p 的值必须介于 1 和 4 之间(含边界值)。如果未指定年精度,p 则等于 2。
时间间隔字面量 | 说明 |
---|---|
INTERVAL '3' YEAR | 时间间隔为3年 |
INTERVAL '3' MONTH | 时间间隔为3个月 |
INTERVAL '3-4' YEAR TO MONTH | 时间间隔为3年4个月 |
- INTERVAL DAY TO SECOND: 一组由 Day-Time Interval 组成的数据类型。时间间隔由 +days hours:minutes:seconds.fractional 组成,其范围从 -999999 23:59:59.999999999 到 +999999 23:59:59.999999999,可以表达:
INTERVAL DAY
INTERVAL DAY(p1)
INTERVAL DAY(p1) TO HOUR
INTERVAL DAY(p1) TO MINUTE
INTERVAL DAY(p1) TO SECOND(p2)
INTERVAL HOUR
INTERVAL HOUR TO MINUTE
INTERVAL HOUR TO SECOND(p2)
INTERVAL MINUTE
INTERVAL MINUTE TO SECOND(p2)
INTERVAL SECOND
INTERVAL SECOND(p2)
其中 p1 是天数(天精度)的位数,p2 是秒的小数部分的位数(小数精度)。p1 的值必须介于 1 和之间 6(含边界值),p2 的值必须介于 0 和之间 9(含边界值)。如果 p1 未指定值,则缺省等于 2,如果 p2 未指定值,则缺省等于 6。
时间间隔字面量 | 说明 |
---|---|
INTERVAL '3' DAY | 时间间隔为3天 |
INTERVAL '2' HOUR | 时间间隔为2小时 |
INTERVAL '25' MINUTE | 时间间隔为25分钟 |
INTERVAL '45' SECOND | 时间间隔为45秒 |
INTERVAL '3 02' DAY TO HOUR | 时间间隔为3天零2小时 |
INTERVAL '3 02:25' DAY TO MINUTE | 时间间隔为3天零2小时25分 |
INTERVAL '3 02:25:45' DAY TO SECOND | 时间间隔为3天零2小时25分45秒 |
INTERVAL '02:25' HOUR TO MINUTE | 时间间隔为2小时25分 |
INTERVAL '02:25:45' HOUR TO SECOND | 时间间隔为2小时25分45秒 |
INTERVAL '25:45' MINUTE TO SECOND | 时间间隔为25分45秒 |
3. 动态输出表转化为输出数据
数据库中表的DML行为有3种: INSERT、UPDATE、DELETE。动态表作为类似于数据库的表的概念,也支持这3种DML操作,但是有些不同
在Flink中,将动态表分为3种类型:
1)只有更新行为,只有一行或多行但被持续更新的表。
2)只有插入行为,没有UPDATE、DELETE更改的只插入表。
3)既有插入行为也有更新行为的表。
当将动态表转化为流或将其写入外部系统时,对动态表的更改(修改)需要被转换为流上的行为,这3种类型的动态表,对应于不同类型的数据流。Flink的Table API & SQL支持3种方式的动态表上的更改(修改)。Flink的Table API & SQL支持3种方式的动态表上的更改(修改)。
(1). Append流
Append流只支持追加写入行为,即只支持INSERT行为的动态表,不支持Update、Delete等改变已存在数据的行为。
(2). Retract流
Retract流包含两种类型消息:add消息和retract消息。动态表的更改行为对应的消息类型如下。
1)INSERT更改转换为流上的add消息。
2)DELETE更改转换为流上的retract消息。
3)UPDATE更改转换为两条消息,即对旧记录的retract消息和新记录的add消息。
(3). Upsert流
Upsert流包含两种类型消息:update消息和delete消息。动态表转化为Upsert流必须有主键(可以是复合主键),具有主键的动态表的更改行为对应的消息类型如下。
1)INSERT、UPDATE转换为UPSERT消息。
2)DELETE转换为delete消息。
4. SQL中指定时间属性的两种方式
如果要满⾜ Flink SQL 时间窗⼝类的聚合操作,SQL 或 Table API 中的 数据源表 就需要提供时间属性(相当于我们把这个时间属性在 数据源表 上⾯进⾏声明),以及⽀持时间相关的操作。FlinkSQL为我们提供的两种指定时间戳的方式
- CREATE TABLE DDL 创建表的时候指定
- 可以在DataStream 中指定,在后续的DataStream转Table中使用
Event Time
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
CREATE TABLE user_actions (
user_name STRING,
data STRING,
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
-- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
// Option 1:
// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());
// Option 2:
// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));
// Usage:
WindowedTable windowedTable = table.window(Tumble
.over(lit(10).minutes())
.on($("user_action_time"))
.as("userActionWindow"));
Processing Time
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- declare an additional field as a processing time attribute
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
DataStream<Tuple2<String, String>> stream = ...;
// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());
WindowedTable windowedTable = table.window(
Tumble.over(lit(10).minutes())
.on($("user_action_time"))
.as("userActionWindow"));