watermark原理之watermark的下发

watermark 如何下发的?

一、 数据读取

A. AbstractStreamTaskNetworkInput:该类是用于读取上游数据
//processElement
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
        if (recordOrMark.isRecord()) {  //数据
            output.emitRecord(recordOrMark.asRecord());
        } else if (recordOrMark.isWatermark()) {    // watermark
            statusWatermarkValve.inputWatermark(
                    recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output);
        } else if (recordOrMark.isLatencyMarker()) {    // latencymarker
            output.emitLatencyMarker(recordOrMark.asLatencyMarker());
        } else if (recordOrMark.isStreamStatus()) {     //stream status
            statusWatermarkValve.inputStreamStatus(
                    recordOrMark.asStreamStatus(),
                    flattenedChannelIndices.get(lastChannel),
                    output);
        } else {
            throw new UnsupportedOperationException("Unknown type of StreamElement");
        }
    }
  • 对象类别 含义:
    StreamStatus : idle(空闲) , active(非空闲)
    watermark : 水位
  1. 什么是LatencyMarker?
    https://blog.csdn.net/nazeniwaresakini/article/details/106615777

二、watermark下发逻辑

watermark下发是上游读取数据,判断每个channel watermark进行对齐并下发新watermark的过程。

StatusWatermarkValve:

  • 概念:
a. InputChannelStatus[] channelStatuses:  // 每个channel的状态
        protected long watermark;    // channel的watermark
        protected StreamStatus streamStatus;      // channel当前的状态
        protected boolean isWatermarkAligned;   // watermark是否对齐
b. private long lastOutputWatermark;       // 上次发出的watermark
c. private StreamStatus lastOutputStreamStatus;   // 上次输出的streamstatus

isWatermarkAligned 不对齐的情况:
(1)streamStatus 为idle
(2) streamstatus是active但是watermark还没有达到上次watermark

  • 初始化
    每个channelstatus 的watermark取的最小值,streamstatus是active状态,watermarkaligned是对齐的。lastoutputwatermark是最小值,lastoutputstreamstatus也是对齐的。
public StatusWatermarkValve(int numInputChannels) {
        checkArgument(numInputChannels > 0);
        this.channelStatuses = new InputChannelStatus[numInputChannels];
        for (int i = 0; i < numInputChannels; i++) {
            channelStatuses[i] = new InputChannelStatus();
            channelStatuses[i].watermark = Long.MIN_VALUE;  //初始化watermark
            channelStatuses[i].streamStatus = StreamStatus.ACTIVE;  //初始化streamstatus
            channelStatuses[i].isWatermarkAligned = true;   //初始化watermark是对齐的
        }
        this.lastOutputWatermark = Long.MIN_VALUE;
        this.lastOutputStreamStatus = StreamStatus.ACTIVE;
    }
image.png

灵魂提问:

  1. 为什么要引入watermark?
    Apache Flink uses watermarks to keep track of the progress in event time. The event time is extracted from one of the fields of the data event that contain the timestamp when that event was originally created.
  2. 为什么要设置idleness?
  • 事件偏差
    (1) Flink 作业需要使用来自具有不同特征的源的事件
    随着时间的推移,使用这种源组合将导致它们各自的水印进程彼此显着不同。这通常在 Flink 作业失败后被放大,作业需要赶上读取事件。
    (2)数据本身的分布不平衡。例如,某些键可能比其他键更频繁地出现,这使得正在处理这些键的相应运算符只需要做更多的处理,这可能会导致水印进程变慢。
  • 造成的问题
    因为watermark变小,导致待定计时器的数量将增加,与之相关的资源量也会增加,背压在增加,检查点屏障的进度将减慢,整个检查点过程将花费更长的时间,导致输入流被阻塞,进而进一步减慢事件的处理速度。这些副作用最终可能导致内存不足错误、检查点失败甚至作业崩溃。
  • 解决办法:
    从 Flink 1.11 开始,对所谓的空闲检测提供了开箱即用的支持。如果一个主题在较长时间内没有产生事件并因此导致没有水印进展,则该主题被称为空闲。Flink 提供的解决方案是,当在可配置的时间段内没有收到任何事件时,将此输入源标记为“暂时空闲”,从而在确定算子的最低水印时忽略此源。

如前所述,在读取一组具有不同特征的输入源时,需要应用不同的解决方案。我们为此提出的,就是我们所说的“平衡阅读”。平衡读取既可以在单个 Kafka 消费者层面实现,也可以在 Flink 作业中所有 Kafka 消费者的各种实例之间实现。

参考:https://www.ververica.com/blog/how-mitigating-event-time-skewness-can-reduce-checkpoint-failures-and-task-manager-crashes

疑问:https://stackoverflow.com/questions/48576391/flink-how-set-up-initial-watermark

源码解析

1. inputwatermark

public void inputWatermark(Watermark watermark, int channelIndex, DataOutput<?> output)
            throws Exception {
        // TODO 为啥忽略 当前channel 或者 lastOutputStreamStatus 是idle的情况
        if (lastOutputStreamStatus.isActive()
                && channelStatuses[channelIndex].streamStatus.isActive()) {
            long watermarkMillis = watermark.getTimestamp();

            //当前watermark > 当前channel的watermark, 小于的情况可以不处理,为了channel上watermark是升序的对齐的
            if (watermarkMillis > channelStatuses[channelIndex].watermark) {
                channelStatuses[channelIndex].watermark = watermarkMillis;

                // 如果watermark赶上上次watermark,那么不对齐的channels会被设为对齐
                if (!channelStatuses[channelIndex].isWatermarkAligned
                        && watermarkMillis >= lastOutputWatermark) {
                    channelStatuses[channelIndex].isWatermarkAligned = true;
                }
                // 尝试找到所有对齐的channel里的最小的watermark
                findAndOutputNewMinWatermarkAcrossAlignedChannels(output);
            }
        }
    }
// 对齐的channel里,取最小值 作为下发的watermark 并且大于上次watermark 才输出
    private void findAndOutputNewMinWatermarkAcrossAlignedChannels(DataOutput<?> output)
            throws Exception {
        long newMinWatermark = Long.MAX_VALUE;
        boolean hasAlignedChannels = false;
        
        // 新的watermark 由 所有 watermark对齐的channel 产生,取最小值
        for (InputChannelStatus channelStatus : channelStatuses) {
            if (channelStatus.isWatermarkAligned) { //当前channel watermark对齐,取最小值
                hasAlignedChannels = true;
                newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
            }
        }
        
        // 当有channel事对齐的,且当前watermark最小值大于上次watermark最小值,下发当前watermark
        if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
            lastOutputWatermark = newMinWatermark;
            output.emitWatermark(new Watermark(lastOutputWatermark));
        }
    }

结论: watermark的处理
(1)需要保证在每个channel上 watermark是升序的才处理
(2)下发watermark 是通过判断对齐的channel(没有标记空闲的)的最小值需要大于lastoutputwatermark。也就是说下发的watermark也要是升序的。

2. inputStreamStatus

    public void inputStreamStatus(StreamStatus streamStatus, int channelIndex, DataOutput<?> output)
            throws Exception {
        // 当前streamstaus空闲,但改channel之前是active的,设置该channel空闲且不对齐
        if (streamStatus.isIdle() && channelStatuses[channelIndex].streamStatus.isActive()) {
            //设置状态从 active -> idle
            channelStatuses[channelIndex].streamStatus = StreamStatus.IDLE;

            // 设置对不对齐
            channelStatuses[channelIndex].isWatermarkAligned = false;

            // 当这个channel是最后一个空闲的,也就是当前所有的都空闲,我们需要输出streamstatus为idle
            if (!InputChannelStatus.hasActiveChannels(channelStatuses)) {

                // now that all input channels are idle and no channels will continue to advance its
                // watermark,we should "flush" all watermarks across all channels; effectively, this means
                // emitting the max watermark across all channels as the new watermark. Also, since we
                // already try to advance the min watermark as channels individually become IDLE, here we only need to
                // perform the flush if the watermark of the last active channel that just became idle is the current
                // min watermark.
                // 因为当前所有channel空闲,并且channel都无法继续更新水印,应该flush所有channel上的watermark,
                // 意味着我们设置channel 中最大的watermark作为新水印
                // 因为在其他变为idle的时候仍然会下发min watermark,所以只需要在最后一个channel变成idle且等于上次min watermark的时候flush
                if (channelStatuses[channelIndex].watermark == lastOutputWatermark) {
                    findAndOutputMaxWatermarkAcrossAllChannels(output);
                }
                //设置为空闲,并下发整个流是空闲的状态
                lastOutputStreamStatus = StreamStatus.IDLE;
                output.emitStreamStatus(lastOutputStreamStatus);
                //有非空闲的时候,并且当前watermark等于上次watermark,取最小的watermark
            } else if (channelStatuses[channelIndex].watermark == lastOutputWatermark) {
                // if the watermark of the channel that just became idle equals the last output
                // watermark (the previous overall min watermark), we may be able to find a new
                // min watermark from the remaining aligned channels
                // 如果watemark不是最后一个变为空闲,并且等于上次outputwatermark,我们只需要从剩余的对齐的channel取最小值作为watermark
                findAndOutputNewMinWatermarkAcrossAlignedChannels(output);
            }
            // streamstatus非空闲 并且之前是空闲
        } else if (streamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isIdle()) {
            // 设置channel的状态从idle变为active
            channelStatuses[channelIndex].streamStatus = StreamStatus.ACTIVE;
            
            // 上次watermark大于等于上次output watermark,则认为channel的 watermark是对齐的
            if (channelStatuses[channelIndex].watermark >= lastOutputWatermark) {
                channelStatuses[channelIndex].isWatermarkAligned = true;
            }  
            // 如果上次整体streamstatus标记为空闲,改为active并输出最后状态
            if (lastOutputStreamStatus.isIdle()) {
                lastOutputStreamStatus = StreamStatus.ACTIVE;
                output.emitStreamStatus(lastOutputStreamStatus);
            }
        }
    }
// 取所有channel里的最大值
private void findAndOutputMaxWatermarkAcrossAllChannels(DataOutput<?> output) throws Exception {
        long maxWatermark = Long.MIN_VALUE;

        for (InputChannelStatus channelStatus : channelStatuses) {
            maxWatermark = Math.max(channelStatus.watermark, maxWatermark);
        }

        if (maxWatermark > lastOutputWatermark) {
            lastOutputWatermark = maxWatermark;
            output.emitWatermark(new Watermark(lastOutputWatermark));
        }
    }

解释:

  1. channel从active变为idle:
    (1) 如果这是最后一个(active->idle)的channel:
    a.那么flush整个流中的watermark。方法是 只有在channel的watermark等于上次outputwatermark,取所有channel最大值作为outputchannel下发。
    b.下游传递streamstatus为空闲状态。
    (2) 不是最后一个(active->idle)的channel,但是当前channel等于上次outputwatermark,取非空闲的channel的最小值。

  2. channel从idle变为active:
    a. 当前channel的watermark大于等于lastoutputwatermark,是指该channel为对齐
    b.上次输出的streamstatus是idle,输出streamstatus改为active

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容