Flink 源码之Buffer Debloating

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

什么是Buffer debloating

Buffer Debloating是Flink 1.14新增的优化方式。它能够根据指标(buffer数据被全部消费的期望等待时间taskmanager.network.memory.buffer-debloat.target)自动推算和控制in-flight data(operator输入队列和输出队列缓存的数据)大小,从而减少checkpoint耗时,减少checkpoint存储大小和恢复时间(因为in-flight data的量减少了)。对于Unaligned Checkpoint效果更为显著。启用和配置方式参见Flink 使用之状态和checkpoint

buffer debloating 源代码解析

StreamTask摄入数据的时候,schedule一个buffer debloater定时任务,触发间隔时间为taskmanager.network.memory.buffer-debloat.period

@Override
public final void invoke() throws Exception {
    // Allow invoking method 'invoke' without having to call 'restore' before it.
    if (!isRunning) {
        LOG.debug("Restoring during invoke will be called.");
        restoreInternal();
    }

    // final check to exit early before starting to run
    ensureNotCanceled();

    // 创建buffer debloating定时任务
    scheduleBufferDebloater();

    // let the task do its work
    runMailboxLoop();

    // if this left the run() method cleanly despite the fact that this was canceled,
    // make sure the "clean shutdown" is not attempted
    ensureNotCanceled();

    afterInvoke();
}

scheduleBufferDebloater方法在systemTimerService注册一个定时任务,周期性触发debloat任务。

private void scheduleBufferDebloater() {
    // See https://issues.apache.org/jira/browse/FLINK-23560
    // If there are no input gates, there is no point of calculating the throughput and running
    // the debloater. At the same time, for SourceStreamTask using legacy sources and checkpoint
    // lock, enqueuing even a single mailbox action can cause performance regression. This is
    // especially visible in batch, with disabled checkpointing and no processing time timers.
    
    // 如果没有inputGate或者没有启用buffer debloating,直接返回
    if (getEnvironment().getAllInputGates().length == 0
            || !environment
                    .getTaskManagerInfo()
                    .getConfiguration()
                    .getBoolean(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED)) {
        return;
    }
    // 注册一个事件,在buffer debloat间隔时间之后调用debloat方法
    // buffer debloat间隔时间由配置项taskmanager.network.memory.buffer-debloat.period决定
    systemTimerService.registerTimer(
            systemTimerService.getCurrentProcessingTime() + bufferDebloatPeriod,
            timestamp ->
                    mainMailboxExecutor.execute(
                            () -> {
                                debloat();
                                // 再schedule一个作业,实现周期定时调用
                                scheduleBufferDebloater();
                            },
                            "Buffer size recalculation"));
}

到此我们可以得知,debloat主要执行逻辑在debloat方法中。

void debloat() {
    for (IndexedInputGate inputGate : environment.getAllInputGates()) {
        inputGate.triggerDebloating();
    }
}

debloat方法依次调用所有inputGatetriggerDebloating方法。

我们查看SingleInputGatetriggerDebloating方法。

@Override
public void triggerDebloating() {
    if (isFinished() || closeFuture.isDone()) {
        return;
    }

    checkState(bufferDebloater != null, "Buffer debloater should not be null");
    final long currentThroughput = throughputCalculator.calculateThroughput();
    bufferDebloater
            // 重新计算buffer大小
            .recalculateBufferSize(currentThroughput, getBuffersInUseCount())
            // 如果返回的不为empty,说明需要更新buffer size
            // 设置各个channel的buffer size
            .ifPresent(this::announceBufferSize);
}

首先我们分析吞吐量计算器ThroughputCalculatorcalculateThroughput方法,用于计算间隔时间内的吞吐量。

public long calculateThroughput() {
    if (measurementStartTime != NOT_TRACKED) {
        // 获取当前时间
        long absoluteTimeMillis = clock.absoluteTimeMillis();
        
        // 获取计量吞吐量期间时长
        currentMeasurementTime += absoluteTimeMillis - measurementStartTime;
        
        // 设置下一个计量起始时间
        measurementStartTime = absoluteTimeMillis;
    }

    // 计算吞吐量,方法参数为这段时间累积的数据量和时长
    long throughput = calculateThroughput(currentAccumulatedDataSize, currentMeasurementTime);

    // 变量重置
    currentAccumulatedDataSize = currentMeasurementTime = 0;

    return throughput;
}

public long calculateThroughput(long dataSize, long time) {
    checkArgument(dataSize >= 0, "Size of data should be non negative");
    checkArgument(time >= 0, "Time should be non negative");

    if (time == 0) {
        return currentThroughput;
    }

    return currentThroughput = instantThroughput(dataSize, time);
}

// 计算每秒的数据量
static long instantThroughput(long dataSize, long time) {
    return (long) ((double) dataSize / time * MILLIS_IN_SECOND);
}

我们回到SingleInputGategetBuffersInUseCount方法,它统计各个channel已使用的buffer数量总和。

int getBuffersInUseCount() {
    int total = 0;
    for (InputChannel channel : channels) {
        total += channel.getBuffersInUseCount();
    }
    return total;
}

接下来该分析BufferDebloaterrecalculateBufferSize方法。它使用指数滑动平均(EMA)算法,推算出一个更为平滑过度的buffer值。它还能够对比新旧buffer size值的变化率,如果变化率过小,不修改buffer size,从而避免了来回拉锯式频繁修改buffer大小造成性能剧烈抖动。

public OptionalInt recalculateBufferSize(long currentThroughput, int buffersInUse) {
    // 当前实际buffer使用量
    int actualBuffersInUse = Math.max(1, buffersInUse);
    // 计算期待的buffer大小,计算公式为:
    // 当前吞吐量 x buffer数据被全部消费的期望等待时间(taskmanager.network.memory.buffer-debloat.target)
    long desiredTotalBufferSizeInBytes =
            (currentThroughput * targetTotalBufferSize) / MILLIS_IN_SECOND;

    // 使用指数滑动平均算法(Exponential moving average),计算新的buffer大小
    int newSize =
            bufferSizeEMA.calculateBufferSize(
                    desiredTotalBufferSizeInBytes, actualBuffersInUse);

    // 估算buffer数据完全消费的所需时间
    lastEstimatedTimeToConsumeBuffers =
            Duration.ofMillis(
                    newSize
                            * actualBuffersInUse
                            * MILLIS_IN_SECOND
                            / Math.max(1, currentThroughput));

    // 如果新计算出的大小和旧的很接近,不更新buffer大小
    // 旧buffer大小乘以taskmanager.network.memory.buffer-debloat.threshold-percentages计算出变化量
    // 如果newSize和旧buffer大小差异值小于变化量,则不更新buffer大小
    boolean skipUpdate = skipUpdate(newSize);

    LOG.debug(
            "Buffer size recalculation: gateIndex={}, currentSize={}, newSize={}, instantThroughput={}, desiredBufferSize={}, buffersInUse={}, estimatedTimeToConsumeBuffers={}, announceNewSize={}",
            gateIndex,
            lastBufferSize,
            newSize,
            currentThroughput,
            desiredTotalBufferSizeInBytes,
            buffersInUse,
            lastEstimatedTimeToConsumeBuffers,
            !skipUpdate);

    // Skip update if the new value pretty close to the old one.
    // 如果不需要更新,返回empty
    if (skipUpdate) {
        return OptionalInt.empty();
    }

    // 返回新计算的buffer大小
    lastBufferSize = newSize;
    return OptionalInt.of(newSize);
}

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,744评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,505评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,105评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,242评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,269评论 6 389
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,215评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,096评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,939评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,354评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,573评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,745评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,448评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,048评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,683评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,838评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,776评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,652评论 2 354

推荐阅读更多精彩内容