再谈Flink事件时间、水印和迟到数据处理

前言

之前的文章中已经屡次提到过Flink的事件时间(event time)、水印(watermark)、乱序(out-of-order)、迟到数据(late element)这些概念,虽然它们都非常基础,但笔者还没有对它们做过像样的介绍,感觉不太合适。正好今天脑子比较累,又是Friday night,不适合写复杂的东西,就来谈谈简单的吧。

事件时间与水印

所谓事件时间,就是Flink DataStream中的数据元素自身带有的、在其实际发生时记录的时间戳,具有业务含义,并与系统时间独立。很显然,由于外部系统产生的数据往往不能及时、按序到达Flink系统,所以事件时间比处理时间有更强的不可预测性。为了能够准确地表达事件时间的处理进度,就必须用到水印。

Flink水印的本质是DataStream中的一种特殊元素,每个水印都携带有一个时间戳。当时间戳为T的水印出现时,表示事件时间t <= T的数据都已经到达,即水印后面应该只能流入事件时间t > T的数据。也就是说,水印是Flink判断迟到数据的标准,同时也是窗口触发的标记。

为了形象地说明水印的作用,参考一下下面的图,是一个乱序的基于事件时间的数据流示例。

https://www.ververica.com/blog/how-apache-flink-enables-new-streaming-applications-part-1

图中的方框就是数据元素,其中的数字表示事件时间,W(x)就表示时间戳是x的水印,并有长度为4个时间单位的滚动窗口。假设时间单位为秒,可见事件时间为2、3、1s的元素都会进入区间为[1s, 4s]的窗口,而事件时间为7s的元素会进入区间为[5s, 8s]的窗口。当水印W(4)到达时,表示已经没有t <= 4s的元素了,[1s, 4s]窗口会被触发并计算。同理,水印W(9)到达时,[5s, 8s]窗口会被触发并计算,以此类推。

不过图中暂时没有示出迟到数据。如果事件时间为6的元素出现在W(9)后面,就算是迟到了。迟到数据的处理后面再说。

上面的示例只有一个并行度,那么在有多个并行度的情况下,就会有多个流产生水印,窗口触发时该采用哪个水印呢?答案是所有流入水印中时间戳最小的那个。来自官方文档的图能够说明问题。

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html

容易理解,如果所有流入水印中时间戳最小的那个都已经达到或超过了窗口的结束时间,那么所有流的数据肯定已经全部收齐,就可以安全地触发窗口计算了。

提取事件时间、产生水印

上面说了这么多,那么事件时间是如何从数据中提取的,水印又是如何产生的呢?Flink提供了统一的DataStream.assignTimestampsAndWatermarks()方法来提取事件时间并同时产生水印,毕竟它们在处理过程中是紧密联系的。

assignTimestampsAndWatermarks()方法接受的参数类型有AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks两种,分别对应周期性水印和打点(即由事件本身的属性触发)水印,它们的类图如下所示。

周期性水印

顾名思义,使用AssignerWithPeriodicWatermarks时,水印是周期性产生的。该周期默认为200ms,也能通过ExecutionConfig.setAutoWatermarkInterval()方法来指定新的周期。

由类图容易看出,我们需要通过实现extractTimestamp()方法来提取事件时间,实现getCurrentWatermark()方法产生水印。但好在Flink已经提供了3种内置的实现类,所以我们直接用就可以了,省事。

  • AscendingTimestampExtractor
    总说话口干舌燥的(?),还是看代码吧。
    public abstract long extractAscendingTimestamp(T element);

    @Override
    public final long extractTimestamp(T element, long elementPrevTimestamp) {
        final long newTimestamp = extractAscendingTimestamp(element);
        if (newTimestamp >= this.currentTimestamp) {
            this.currentTimestamp = newTimestamp;
            return newTimestamp;
        } else {
            violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
            return newTimestamp;
        }
    }

    @Override
    public final Watermark getCurrentWatermark() {
        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
    }

AscendingTimestampExtractor产生的时间戳和水印必须是单调非递减的,用户通过覆写extractAscendingTimestamp()方法抽取时间戳。如果产生了递减的时间戳,就要使用名为MonotonyViolationHandler的组件处理异常,有两种方式:打印警告日志(默认)和抛出RuntimeException。

单调递增的事件时间并不太符合实际情况,所以AscendingTimestampExtractor用得不多。

  • BoundedOutOfOrdernessTimestampExtractor
    它的出镜率就非常高了。还是看代码先。
    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    }

    public abstract long extractTimestamp(T element);

    @Override
    public final Watermark getCurrentWatermark() {
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }

如名字所述,BoundedOutOfOrdernessTimestampExtractor产生的时间戳和水印是允许“有界乱序”的,构造它时传入的参数maxOutOfOrderness就是乱序区间的长度,而实际发射的水印为通过覆写extractTimestamp()方法提取出来的时间戳减去乱序区间,相当于让水印把步调“放慢一点”。这是Flink为迟到数据提供的第一重保障。

当然,乱序区间的长度要根据实际环境谨慎设定,设定得太短会丢较多的数据,设定得太长会导致窗口触发延迟,实时性减弱。

  • IngestionTimeExtractor
    @Override
    public long extractTimestamp(T element, long previousElementTimestamp) {
        final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
        maxTimestamp = now;
        return now;
    }

    @Override
    public Watermark getCurrentWatermark() {
        final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
        maxTimestamp = now;
        return new Watermark(now - 1);
    }

IngestionTimeExtractor基于当前系统时钟生成时间戳和水印,其实就是Flink三大时间特征里的摄入时间了。

打点水印

打点水印比周期性水印用的要少不少,并且Flink没有内置的实现,那么就写个最简单的栗子吧。

    sourceStream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<UserActionRecord>() {
      @Nullable
      @Override
      public Watermark checkAndGetNextWatermark(UserActionRecord lastElement, long extractedTimestamp) {
        return lastElement.getUserId().endsWith("0") ? new Watermark(extractedTimestamp - 1) : null;
      }

      @Override
      public long extractTimestamp(UserActionRecord element, long previousElementTimestamp) {
        return element.getTimestamp();
      }
    });

AssignerWithPunctuatedWatermarks适用于需要依赖于事件本身的某些属性决定是否发射水印的情况。我们实现checkAndGetNextWatermark()方法来产生水印,产生的时机完全由用户控制。上面例子中是收取到用户ID末位为0的数据时才发射。

还有三点需要提醒:

  • 不管使用哪种方式产生水印,都不能过于频繁。因为Watermark对象是会全部流向下游的,也会实打实地占用内存,水印过多会造成系统性能下降。
  • 水印的生成要尽量早,一般是在接入Source之后就产生,或者在Source经过简单的变换(map、filter等)之后产生。
  • 如果需求方对事件时间carry的业务意义并不关心,可以直接使用处理时间,简单方便。

迟到数据处理

如上所述,水印的乱序区间能够保证一些迟到数据不被丢弃,但是乱序区间往往不很长,那些真正迟到了的数据该怎么办呢?有两种方法来兜底,可以说是Flink为迟到数据提供的第二重保障。

窗口允许延迟

Flink提供了WindowedStream.allowedLateness()方法来设定窗口的允许延迟。也就是说,正常情况下窗口触发计算完成之后就会被销毁,但是设定了允许延迟之后,窗口会等待allowedLateness的时长后再销毁。在该区间内的迟到数据仍然可以进入窗口中,并触发新的计算。当然,窗口也是吃资源大户,所以allowedLateness的值要适当。给个完整的代码示例如下。

      sourceStream.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
          private static final long serialVersionUID = 1L;
          @Override
          public long extractTimestamp(UserActionRecord record) {
            return record.getTimestamp();
          }
        }
      )
      .keyBy("platform")
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      .allowedLateness(Time.seconds(30))
      .aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
      // ......

allowedLateness机制实际上就是DataFlow模型中的回填(backfill)策略的实现。对于滑动窗口和滚动窗口是累积(accumulating)策略,对于会话窗口则是累积与回撤(accumulating & retracting)策略。之前讲DataFlow模型时提到过,不废话了。

侧输出迟到数据

侧输出(side output)是Flink的分流机制。迟到数据本身可以当做特殊的流,我们通过调用WindowedStream.sideOutputLateData()方法将迟到数据发送到指定OutputTag的侧输出流里去,再进行下一步处理(比如存到外部存储或消息队列)。代码如下。

      // 侧输出的OutputTag
      OutputTag<UserActionRecord> lateOutputTag = new OutputTag<>("late_data_output_tag");

      sourceStream.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
          private static final long serialVersionUID = 1L;
          @Override
          public long extractTimestamp(UserActionRecord record) {
            return record.getTimestamp();
          }
        }
      )
      .keyBy("platform")
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      .allowedLateness(Time.seconds(30))
      .sideOutputLateData(lateOutputTag)   // 侧输出
      .aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
      // ......

      // 获取迟到数据并写入对应Sink
      stream.getSideOutput(lateOutputTag).addSink(lateDataSink);

The End

2019年快过去了,今年真的写了不少东西呢。

民那晚安。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,561评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,218评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,162评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,470评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,550评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,806评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,951评论 3 407
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,712评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,166评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,510评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,643评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,306评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,930评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,745评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,983评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,351评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,509评论 2 348