Flink生成Timestamp和Watermark

本章节是关于在event time上执行的程序的。想获取更多关于event timeprocessing timeingestion time的信息,请参考:事件时间介绍。
  为了与event time结合使用,流程序需要相应地设置一个时间特性。
Java代码:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Scala代码:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

指定时间戳(Assigning Timestamps)

为了使用event time,Flink需要知道事件的时间戳,也就是说数据流中的元素需要分配一个事件时间戳。这个通常是通过抽取或者访问事件中某些字段的时间戳来获取的。
  时间戳的分配伴随着水印的生成,告诉系统事件时间中的进度。
  这里有两种方式来分配时间戳和生成水印:
    1、直接在数据流源中进行
    2、通过timestamp assignerwatermark generator生成:在Flink中,timestamp 分配器也定义了用来发射的水印。
  注意:timestampwatermark都是通过从1970年1月1日0时0分0秒到现在的毫秒数来指定的。

有Timestamp和Watermark的源函数(Source Function with Timestamps And Watermarks)

数据流源可以直接为它们产生的数据元素分配timestamp,并且他们也能发送水印。这样做的话,就没必要再去定义timestamp分配器了,需要注意的是:如果一个timestamp分配器被使用的话,由源提供的任何timestampwatermark都会被重写。
  为了通过源直接为一个元素分配一个timestamp,源需要调用SourceContext中的collectWithTimestamp(...)方法。为了生成watermark,源需要调用emitWatermark(Watermark)方法。
下面是一个简单的(无checkpoint)由源分配timestamp和产生watermark的例子:
Java 代码:

@Override
public void run(SourceContext<MyType> ctx) throws Exception {
    while (/* condition */) {
        MyType next = getNext();
        ctx.collectWithTimestamp(next, next.getEventTimestamp());

        if (next.hasWatermarkTime()) {
            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
        }
    }
}

Scala 代码:

override def run(ctx: SourceContext[MyType]): Unit = {
    while (/* condition */) {
        val next: MyType = getNext()
        ctx.collectWithTimestamp(next, next.eventTimestamp)

        if (next.hasWatermarkTime) {
            ctx.emitWatermark(new Watermark(next.getWatermarkTime))
        }
    }
}

TimeStamp分配器和Watermark生成器(Timestamp Assigners / Watermark Generators)

Timestamp分配器获取一个流并生成一个新的带有时间戳元素和水印的流。如果原来的流中已经有了timestamp和/或水印的话,这个timestamp分配器会覆盖掉。
  Timestamp分配器常常在数据源之后就立即指定了,但是并不是要严格这么做,一个常用的模式是先解析(MapFunction)和过滤(FilterFunction)后再指定timestamp 分配器。在任何情况下,时间戳分配器都必须在第一个在事件时间上运行的操作(如:第一个时间窗口操作)之前指定。有一个特殊情况,当使用Kafka作为流作业的数据源时,Flink允许在源内部指定timestamp分配器和watermark生成器。更多关于如何进行的信息请参考Kafka Connector的文档。
接下来的部分展示了要创建自己的timestamp 抽取器和watermark发射器,程序员需要实现的主要接口。想要查看Flink预定义的抽取器,请前往预定于Timestamp Extractors/Watermark Emitter页面
Java 代码:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);

Scala 代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter());

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)

周期性水印(With Periodic Watermarks)

AssignerWithPeriodicWatermarks周期性地分配timestamp和生成watermark(可能依赖于元素或者纯粹基于处理时间)。
watermark产生的事件间隔(每n毫秒)是通过ExecutionConfig.setAutoWatermarkInterval(...)来定义的,每当分配器的getCurrentWatermark()方法呗调用时,如果返回的watermark是非空并且大于上一个watermark的话,一个新的watermark将会被发射。

下面是两个关于带有周期性watermark生成的timestamp分配器的例子。
Java代码 :

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current time minus the maximum time lag
        return new Watermark(System.currentTimeMillis() - maxTimeLag);
    }
}

Scala 代码:

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L; // 3.5 seconds

    var currentMaxTimestamp: Long;

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp;
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxTimeLag = 5000L; // 5 seconds

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current time minus the maximum time lag
        new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
}

带断点的水印(With Punctuated Watermarks)
无论何时一个特定的事件表明一个新的watermark可能需要被创建,都使用AssignerWithPunctuatedWatermarks来生成。在这个类中Flink首先调用extractTimestamp(...)来为元素分配一个timestamp,然后立即调用该元素上的checkAndGetNextWatermark(...)方法。
checkAndGetNextWatermark(...)方法传入在extractTimestamp(...)方法中分配的timestamp,并决定是否需要生产watermark。一旦checkAndGetNextWatermark(...)返回一个非空的watermark并且watermark比前一个watermark大的话,这个新的watermark将会被发送。
Java 代码:

public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
        return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
    }
}

Scala 代码:

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
        if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
    }
}

注意:每个单独的事件都可以产生一个watermark,然而,由于每个watermark都会导致一些下游的计算,过多的watermark会导致性能的降低。

每个Kafka分区的Timestamp(TimeStamps per Kafka Partion)

当使用Apache Kafka座位数据源时,每个Kafka分区可能有一个简单的事件时间模式(递增的timestamp或者有界的无序)。然而,当消费Kafka中的数据时,多个分区通常是并发进行的,将事件从分区中分离开来,并销毁分区模式(这是Kafka consumer客户端固有的工作模式)。
在这种情况下,你可以使用Flink的 Kafka-partition-aware(译作:Kafka分区识别或者Kafka分区敏感)水印生成,使用这个特性,水印会在Kafka消费端的每个分区中生成,并且每个分区的水印会在stream shuffle中进行合并。
例如:如果每个Kafka分区中的事件timestamp是严格递增的话,使用ascending timestamps watermark generator(递增时间戳水印生成器)将会得到完美的整体水印。
下图展示了如何使用per-kafka-partition水印生成,以及水印是如何在流式数据流中传播的。
Java 代码:

FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {

    @Override
    public long extractAscendingTimestamp(MyType element) {
        return element.eventTimestamp();
    }
});

DataStream<MyType> stream = env.addSource(kafkaSource);

Scala 代码:

val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
    def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})

val stream: DataStream[MyType] = env.addSource(kafkaSource)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容