基于 Table API 实现实时报表

Apache Flink 提供了一个统一的关系API —— Table API, 用于批处理和实时计算。也就是说,无论是在无界的实时数据流上还是在有界批量数据集上都可以使用相同的语义执行查询,并且得到相同的结果。Flink Table API 通常用于简化数据分析,数据管道和ETL应用的定义。

What Will You Be Building?

在本教程中,你将会学习如何构建实时仪表盘来按账户跟踪财务交易。数据管道将从Kafka读取数据,把结果数据写入MySQL并通过Grafana进行可视化。

Prerequisites

本次演练假设你对Java或者Scala有一定的了解,但是即使你熟悉和使用的是其他语言,也应该可以理解。同时还假设你熟悉基本的关系概念,例如 selectgroup by

Help, I’m Stuck!

如果你陷入困境,可以查看 community support resources。值得一提的是,Apache Flink 的用户邮件列表一直被评为所有Apache项目中最活跃的项目之一,因此通过邮件列表进行求助吗,也不失为一种很棒的快速解决问题途径。

How To Follow Along

如果你想继续跟进本教程,你将需要具有以下环境的一台计算机:

  • Java 8 或者 Java 11
  • Maven
  • Docker

本次演练需要的配置文件在 flink-playgrounds仓库中提供,下载之后,在你的IDE中打开 flink-palyground/table-walkthrough,并导航到文件 SpendReport

 EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        tEnv.executeSql("CREATE TABLE transactions (\n" +
                "    account_id  BIGINT,\n" +
                "    amount      BIGINT,\n" +
                "    transaction_time TIMESTAMP(3),\n" +
                "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'topic'     = 'transactions',\n" +
                "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
                "    'format'    = 'csv'\n" +
                ")");

        tEnv.executeSql("CREATE TABLE spend_report (\n" +
                "    account_id BIGINT,\n" +
                "    log_ts     TIMESTAMP(3),\n" +
                "    amount     BIGINT\n," +
                "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
                ") WITH (\n" +
                "  'connector'  = 'jdbc',\n" +
                "  'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
                "  'table-name' = 'spend_report',\n" +
                "  'driver'     = 'com.mysql.jdbc.Driver',\n" +
                "  'username'   = 'sql-demo',\n" +
                "  'password'   = 'demo-sql'\n" +
                ")");

        Table transactions = tEnv.from("transactions");
        report(transactions).executeInsert("spend_report");

代码分析

执行环境

代码前两行设置了 TableEnvironment. 它用于配置作业的属性,指定是以批的方式还是以流的方式运行程序以及数据源的创建。本次演练使用流式的模式创建了一个标准的 TableEnvironment.

        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

注册表

接下来,在当前的 Catalog中注册表,你可以使用它连接外部系统以读写批量数据集和流式数据。数据源表提供了对存储在外部系统中数据的访问,这些系统包括数据库、键值存储、消息队列、文件系统等。表数据汇向外部系统按表的方式写入数据。不同类型的数据源和数据汇支持不同的存储格式,例如CSV、JSON、Avro或者Parquet等。

tEnv.executeSql("CREATE TABLE transactions (\n" +
     "    account_id  BIGINT,\n" +
     "    amount      BIGINT,\n" +
     "    transaction_time TIMESTAMP(3),\n" +
     "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
     ") WITH (\n" +
     "    'connector' = 'kafka',\n" +
     "    'topic'     = 'transactions',\n" +
     "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
     "    'format'    = 'csv'\n" +
     ")");

需要注册两个表,一个是输入表transaction,另一个是输出表spend report. 表 transaction读取信用卡交易数据,其中包含账户ID(account_id),时间戳和美元金额。该表是kafka 主题 transactions上的一个逻辑表,该主题存储的数据格式为CSV。

tEnv.executeSql("CREATE TABLE spend_report (\n" +
    "    account_id BIGINT,\n" +
    "    log_ts     TIMESTAMP(3),\n" +
    "    amount     BIGINT\n," +
    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
    ") WITH (\n" +
    "    'connector'  = 'jdbc',\n" +
    "    'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
    "    'table-name' = 'spend_report',\n" +
    "    'driver'     = 'com.mysql.jdbc.Driver',\n" +
    "    'username'   = 'sql-demo',\n" +
    "    'password'   = 'demo-sql'\n" +
    ")");

第二个表spend_report存储着最终的聚合结果,它是mysql数据库中的一个表。

查询

在配置完运行环境并注册了表之后,你就可以开始着手构建第一个应用程序了。可以从TableEnvironment中获取输入表以读取数据,并且使用 executeInsert将结果数据写入到输出表。report函数是你实现真正业务逻辑的地方,目前尚未实现。

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings); 

测试

本项目包含了一个辅助测试类SpendReportTest用于验证报表逻辑,它已批处理模式创建TableEnvironment

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings); 

Flink的特性之一是在提供了跨越批处理和流式处理的一致性语义。也就是说你可以在静态数据集上以批处理的模式开发和测试应用程序,并将其作为流应用程序部署到生产环境中。

实现业务逻辑

现在作业的框架已经设置完成,你可以添加一些业务逻辑了。我们的目标是构建一个展示每个账户一天中每个小时总支出的报告。这就意味这时间戳字段的粒度需要从毫秒换算到小时。

Flink支持使用纯SQL或者Table API开发关系应用程序。Table API 是和SQL具备相同功能的流畅的DSL,可以用Python,Java,或者Scala编写,并且支持强大的IDE集成功能。可以完全像SQL一样进行选择需要的字段进行查询并且按键值分组聚合,再使用一些f内置的函数例如floor和sum,就可以来实现report了。

public static Table report(Table transactions) {
    return transactions.select(
            $("account_id"),
            $("transaction_time").floor(TimeIntervalUnit.HOUR).as("log_ts"),
            $("amount"))
        .groupBy($("account_id"), $("log_ts"))
        .select(
            $("account_id"),
            $("log_ts"),
            $("amount").sum().as("amount"));
}

用户自定义函数

Flink包含了数据有限的内置函数,有时候你需要通过自定义函数来实现某些需求。如果floor没有在内置函数中实现,你可以选择自己去实现。

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;

public class MyFloor extends ScalarFunction {

    public @DataTypeHint("TIMESTAMP(3)") LocalDateTime eval(
        @DataTypeHint("TIMESTAMP(3)") LocalDateTime timestamp) {

        return timestamp.truncatedTo(ChronoUnit.HOURS);
    }
}

之后可以快速将其应用在你的程序当中,

public static Table report(Table transactions) {
    return transactions.select(
            $("account_id"),
            call(MyFloor.class, $("transaction_time")).as("log_ts"),
            $("amount"))
        .groupBy($("account_id"), $("log_ts"))
        .select(
            $("account_id"),
            $("log_ts"),
            $("amount").sum().as("amount"));
}

此查询,读取transactions表中的所有记录,计算报表,并且以高效和可扩展的方式输出结果。使用此实现运行测试将通过。

添加窗口函数

基于时间对数据分组在数据处理中是一种典型的操作,特别是在处理无界的流式数据时。基于时间的分组被称为窗口,Flink提供了灵活的窗口语义。最基本的窗口是滚动窗口,它具有固定的大小并且不同窗口之间不存在重叠。

public static Table report(Table transactions) {
    return transactions
        .window(Tumble.over(lit(1).hour()).on($("transaction_time")).as("log_ts"))
        .groupBy($("account_id"), $("log_ts"))
        .select(
            $("account_id"),
            $("log_ts").start().as("log_ts"),
            $("amount").sum().as("amount"));
}

你的应用程序基于时间戳这一列定义了一小时的滚动窗口。因此,时间戳为 2019-06-01 01:23:47 的行被放入 2019-06-01 01:00:00窗口。

基于时间的聚合是唯一的,因为与其他属性不同的是,时间在连续的流应用程序中通常是向前移动的。与floor和其他UDF不同的是, 窗口函数是可以在运行时应用额外优化的内部函数。在批处理上下文中,窗口函数提供了一个便利的API可以根据时间戳属性对记录进行分组。

使用此实现运行测试也将通过。

流处理模式

现在,这就是一个具备完整功能的有状态的分布式流应用程序。该查询不断地消费来自kafka的transactions数据流,并计算每小时的总开支,并在结果准备就绪后立即发出。由于输入的数据流式无界的,所以该查询将一直运行除非手动停止。因为作业使用的实际基于时间的窗口聚合,所以当框架判定不会再有更多的记录到达某个特定窗口的时候,Flink会执行特定的优化,例如状态清理。

本次演练 的流应用程序是完全基于的Docker的,并且可以本地运行。该环境中包括 Kafka ,一个连续的数据生成器、MySQL和Grafana。

table-walkthrough文件夹中启动docker-compose脚本:

$ docker-compose build
$ docker-compose up -d

你可以通过Flink控制台查看正在运行的作业的信息:
[图片上传失败...(image-cceed-1619139096330)]
在内置的MySQL数据库中查询结果数据:

$ docker-compose exec mysql mysql -Dsql-demo -usql-demo -pdemo-sql

mysql> use sql-demo;
Database changed

mysql> select count(*) from spend_report;
+----------+
| count(*) |
+----------+
|      110 |
+----------+

最终,可以使用Grafana对结果进行可视化展示:
[图片上传失败...(image-c85cce-1619139096331)]

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,376评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,126评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,966评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,432评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,519评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,792评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,933评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,701评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,143评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,488评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,626评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,292评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,896评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,742评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,324评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,494评论 2 348

推荐阅读更多精彩内容