Apache Flink features two relational APIs - the Table API and SQL - for unified stream and batch processing.
Flink’s SQL support is based on Apache Calcite which implements the SQL standard.
注意事项:table api和sql还处于活跃开发状态,不是所有功能都已经实现
Please note that the Table API and SQL are not yet feature complete and are being actively developed. Not all operations are supported by every combination of [Table API, SQL] and [stream, batch] input.
Concepts & Common API
Streaming Table API & SQL
关系查询在Data Streams
区别
<colgroup><col style="width: 424px;"><col style="width: 424px;"></colgroup>
|
Relational Algebra / SQL
|
Stream Processing
|
|
Relations (or tables) are bounded (multi-)sets of tuples.
|
A stream is an infinite sequences of tuples.
|
|
A query that is executed on batch data (e.g., a table in a relational database) has access to the complete input data.
|
A streaming query cannot access all data when is started and has to "wait" for data to be streamed in.
|
|
A batch query terminates after it produced a fixed sized result.
|
A streaming query continuously updates its result based on the received records and never completes.
|
联系
- A database table is the result of a stream of INSERT, UPDATE, and DELETE DML statements, often called changelog stream.
- A materialized view is defined as a SQL query. In order to update the view, the query is continuously processes the changelog streams of the view’s base relations.
- The materialized view is the result of the streaming SQL query.
Dynamic Tables & Continuous Queries
Dynamic tables are the core concept of Flink’s Table API and SQL support for streaming data. In contrast to the static tables that represent batch data, dynamic table are changing over time.
[图片上传失败...(image-da00e1-1524133656710)]
- A stream is converted into a dynamic table.
- A continuous query is evaluated on the dynamic table yielding a new dynamic table.
- The resulting dynamic table is converted back into a stream.
Defining a Table on a Stream
Essentially, we are building a table from an INSERT-only changelog stream.
The following figure visualizes how the stream of click event (left-hand side) is converted into a table (right-hand side). The resulting table is continuously growing as more records of the click stream are inserted.
[图片上传失败...(image-b30af4-1524133656710)]
注意: A table which is defined on a stream is internally not materialized
Continuous Queries
A continuous query is evaluated on a dynamic table and produces a new dynamic table as result. In contrast to a batch query, a continuous query never terminates and updates its result table according to the updates on its input tables.At any point in time, the result of a continuous query is semantically equivalent to the result of the same query being executed in batch mode on a snapshot of the input tables.本质上与在batch table中使用sql查询,效果是一致的
[图片上传失败...(image-1345f7-1524133656710)]
When the query is started, the clicks table (left-hand side) is empty. The query starts to compute the result table, when the first row is inserted into the clicks table. After the first row [Mary, ./home] was inserted, the result table (right-hand side, top) consists of a single row [Mary, 1]. When the second row [Bob, ./cart] is inserted into the clicks table, the query updates the result table and inserts a new row [Bob, 1]. The third row [Mary, ./prod?id=1] yields an update of an already computed result row such that [Mary, 1] is updated to [Mary, 2]. Finally, the query inserts a third row [Liz, 1] into the result table, when the fourth row is appended to the clicks table.
[图片上传失败...(image-6a8473-1524133656710)]
As before, the input table clicks is shown on the left. The query continuously computes results every hour and updates the result table. The clicks table contains four rows with timestamps (cTime) between 12:00:00 and 12:59:59. The query computes two results rows from this input (one for each user) and appends them to the result table. For the next window between 13:00:00 and 13:59:59, the clicks table contains three rows, which results in another two rows being appended to the result table. The result table is updated, as more rows are appended to clicks over time.
Query Restrictions
State Size: 有些程序会运行经年累月的,中间的state都会做保存,如果数据量大了会造成查询失败。
Computing Updates:
Table to Stream Conversion
A dynamic table can be continuously modified by INSERT, UPDATE, and DELETE changes just like a regular database table. It might be a table with a single row, which is constantly updated, an insert-only table without UPDATE and DELETE modifications, or anything in between.
- Append-only stream: A dynamic table that is only modified by INSERT changes can be converted into a stream by emitting the inserted rows.
- Retract stream: A retract stream is a stream with two types of messages, add messages and retract messages. A dynamic table is converted into an retract stream by encoding an INSERT change as add message, a DELETE change as retract message, and an UPDATE change as a retract message for the updated (previous) row and an add message for the updating (new) row. The following figure visualizes the conversion of a dynamic table into a retract stream.
- Upsert stream: An upsert stream is a stream with two types of messages, upsert messages and delete message. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. A dynamic table with unique key is converted into a dynamic table by encoding INSERT and UPDATE changes as upsert message and DELETE changes as delete message. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages correctly. The main difference to a retract stream is that UPDATE changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream.
Time Attributes
Flink主要提供三种时间粒度:
1.Processing time:使用机器的的时间,通常被称为:wall-clock time
2.Event time :事件时间,通常都是基于每行数据本身的timestamp
3.Ingestion time:通常是事件进入到flink的,通常与事件时间相同。
Processing Time
During DataStream-to-Table Conversion
Using a TableSource
Event time
During DataStream-to-Table Conversion
There are two ways of defining the time attribute when converting a DataStream into a Table:
- Extending the physical schema by an additional logical field
- Replacing a physical field by a logical field (e.g. because it is no longer needed after timestamp extraction).
Using a TableSource
The event time attribute is defined by a TableSource that implements the DefinedRowtimeAttribute interface. The logical time attribute is appended to the physical schema defined by the return type of the TableSource.
Timestamps and watermarks must be assigned in the stream that is returned by the getDataStream() method.
Query Configuration
Table API
Update and Append Queries
第一个栗子,是可以更新前面的数据,changelog stream定义了结果包含insert 和 update changes。第二个栗子,只是追加结果到结果表中。changelog stream 结果表只是由insert changes组成
SQL
SQL queries are specified with the sql() method of the TableEnvironment. The method returns the result of the SQL query as a Table. A Table can be used in subsequent SQL and Table API queries, be converted into a DataSet or DataStream, or written to a TableSink). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single program.
Table可以从 TableSource, Table, DataStream, 或者 DataSet 转化而来。
或者,用户可以从 注册外部的目录到 TableEnvironment中
为了用sql查询的方式查询table,必须注册到 tableEnvironment中。一个table可以从 TableSource,Table,DataStream , 或者 DataSet中注册生成。或者,用户也可以从外部目录注册到TableEnvironment中,通过指定一个特定的目录。
In order to access a table in a SQL query, it must be registered in the TableEnvironment. A table can be registered from a TableSource, Table, DataStream, or DataSet. Alternatively, users can also register external catalogs in a TableEnvironment to specify the location of the data sources.
注意:flink sql的功能支持还没有健全。有些不支持的sql查询了之后会报 TableException。所以支持的sql在批量处理中或者流式处理中都列举在下面
Supported Syntax
Flink parses SQL using Apache Calcite, which supports standard ANSI SQL. DML and DDL statements are not supported by Flink.
Scan, Projection, and Filter
Aggregations
GroupBy Aggregation
GroupBy Window Aggregation
Over Window aggregation
Group Windows
<colgroup><col style="width: 310px;"><col style="width: 310px;"></colgroup>
|
TUMBLE(time_attr, interval)
|
固定窗口
|
|
HOP(time_attr, interval, interval)
|
滑动窗口(跳跃窗口)。有两个interval参数,第一个主要定义了滑动间隔,第二个主要定义了窗口大小
|
|
SESSION(time_attr, interval)
|
会话窗口。session time window没有固定的持续时间,但是它们的边界是通过时间 interval来交互的。如果在固定的间隙之间没有新的event进入,session window就会关闭,或者这行数据就会添加到已有的window中
|
Time Attributes
- Processing time。记录是机器和系统的时间,当在做处理的时候。
- Event time。消息自带的时间,可以通过encode等指定。
- Ingestion time。事件到达flink的时间。这个与process time的功能类似。
<colgroup><col style="width: 297px;"><col style="width: 293px;"></colgroup>
|
Auxiliary Function
|
Description
|
|
TUMBLE_START(time_attr, interval)
HOP_START(time_attr, interval, interval)
SESSION_START(time_attr, interval)
|
常用于计算窗口的开始时间和结束时间。Returns the start timestamp of the corresponding tumbling, hopping, and session window.
|
|
TUMBLE_END(time_attr, interval)
HOP_END(time_attr, interval, interval)
SESSION_END(time_attr, interval)
|
Returns the end timestamp of the corresponding tumbling, hopping, and session window.
|
Table Sources & Sinks
User-Defined Functions