Flink 源码之 Buffer Timeout优化

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

Buffer Timeout 概念

Flink每个算子向下游发送数据需要两个条件:

  • 输出buffer空间占满
  • buffer中数据存在时间超过buffer timeout配置值(默认值为100ms)

这个配置值对Flink性能影响至关重大。配置的低,数据的延迟很小,但是会带量大量高频的网络通信,同时大幅提高CPU占用率。配置值过高buffer会经常填满,数据的延迟会增大很多。有文章表明,在大并发的情况下,如果对数据的延迟不是十分敏感,适当的调大buffer timeout(1s左右即可)可以降低CPU使用率 30% - 50%。

Buffer Timeout 配置

Buffer timeout有两个级别:全局级别和算子级别。

全局级别的Buffer timeout通过StreamExecutionEnvironment.setBufferTimeout方法配置。代码如下:

public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
    if (timeoutMillis < -1) {
        throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1");
    }

    this.bufferTimeout = timeoutMillis;
    return this;
}

StreamExecutionEnvironment中设置的bufferTimeout在构造StreamGraph的时候作为默认的buffer timeout使用。如果用户没有给算子指定专门的buffer timeout,自动采用默认的buffer timeout。

算子级别的Buffer timeout只影响这一个算子的配置。算子级别对应的是SingleOutputStreamOperator。我们查看它的setBufferTimeout方法:

public SingleOutputStreamOperator<T> setBufferTimeout(long timeoutMillis) {
    checkArgument(timeoutMillis >= -1, "timeout must be >= -1");
    transformation.setBufferTimeout(timeoutMillis);
    return this;
}

它为算子对应的Transformation对象设置了bufferTimeout属性。

Buffer Timeout 如何影响StreamGraph

Flink把Transformation翻译为StreamGraph需要用到各种各样的translator。我们查看下它的基类SimpleTransformationTranslatorconfigure方法片段;

// ...
StreamGraphUtils.configureBufferTimeout(
    streamGraph, transformationId, transformation, context.getDefaultBufferTimeout());
// ...

它使用了StreamGraphUtils配置StreamGraph的缓存timeout。详细内容我们需要展开分析configureBufferTimeout方法:

public static <T> void configureBufferTimeout(
    StreamGraph streamGraph,
    int nodeId,
    Transformation<T> transformation,
    long defaultBufferTimeout) {

    if (transformation.getBufferTimeout() >= 0) {
        streamGraph.setBufferTimeout(nodeId, transformation.getBufferTimeout());
    } else {
        streamGraph.setBufferTimeout(nodeId, defaultBufferTimeout);
    }
}

它接收的4个参数分别为:需要生成的streamGraph,StreamNode id,Transformation和默认的buffer timeout配置(StreamExecutionEnvironment级别的配置为默认配置)。

该方法又调用了StreamGraphsetBufferTimeout方法。我们继续跟踪。这个方法为Transformation对应的StreamNode设置bufferTimeout属性。

public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
    if (getStreamNode(vertexID) != null) {
        getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
    }
}

到此位置我们得知用户为每个算子设定的buffer timeout配置最终反应到了StreamGraph中算子对应StreamNodebufferTimeout属性。

下一章节开始分析bufferTimeout属性如何影响Flink 处理数据的行为。

Buffer Timeout 如何影响数据处理行为

我们查看StreamEdge的构造函数:

public StreamEdge(
    StreamNode sourceVertex,
    StreamNode targetVertex,
    int typeNumber,
    StreamPartitioner<?> outputPartitioner,
    OutputTag outputTag,
    StreamExchangeMode exchangeMode) {

    this(
        sourceVertex,
        targetVertex,
        typeNumber,
        sourceVertex.getBufferTimeout(),
        outputPartitioner,
        outputTag,
        exchangeMode);
}

可以发现StreamEdgebufferTimeout是由sourceVertex,即Edge上游StreamNodebufferTimeout属性决定的。

接着追踪StreamEdgebufferTimeout调用过程,我们找到了StreamTask.createRecordWriter方法调用:

private static <OUT>
    List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
    StreamConfig configuration, Environment environment) {
    List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters =
        new ArrayList<>();
    List<StreamEdge> outEdgesInOrder =
        configuration.getOutEdgesInOrder(
        environment.getUserCodeClassLoader().asClassLoader());

    // 遍历每个StreamEdge,逐个创建RecordWriter
    // RecordWriter的bufferTimeout为Edge的bufferTimeout
    for (int i = 0; i < outEdgesInOrder.size(); i++) {
        StreamEdge edge = outEdgesInOrder.get(i);
        recordWriters.add(
            createRecordWriter(
                edge,
                i,
                environment,
                environment.getTaskInfo().getTaskNameWithSubtasks(),
                edge.getBufferTimeout()));
    }
    return recordWriters;
}

createRecordWriter方法内容片段如下。可知RecordWriter通过RecordWriterBuilder创建:

RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output =
        new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>()
                .setChannelSelector(outputPartitioner)
                .setTimeout(bufferTimeout)
                .setTaskName(taskName)
                .build(bufferWriter);

继续查看RecordWriterBuilderbuild方法:

public RecordWriter<T> build(ResultPartitionWriter writer) {
    if (selector.isBroadcast()) {
        return new BroadcastRecordWriter<>(writer, timeout, taskName);
    } else {
        return new ChannelSelectorRecordWriter<>(writer, selector, timeout, taskName);
    }
}

无论创建的是BroadcastRecordWriter(广播形式写入数据到输出缓存)还是ChannelSelectorRecordWriter(把数据写入到特定channel,例如keyBy算子),他们的父类都为RecordWriter。所以接下来需要展开分析的内容为RecordWriter

我们查看RecordWriter的构造函数,发现其中创建了一个OutputFlush对象(如果没有禁用network buffer timeout的话):

RecordWriter(ResultPartitionWriter writer, long timeout, String taskName) {
    this.targetPartition = writer;
    this.numberOfChannels = writer.getNumberOfSubpartitions();

    this.serializer = new DataOutputSerializer(128);

    checkArgument(timeout >= ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT);
    this.flushAlways = (timeout == ExecutionOptions.FLUSH_AFTER_EVERY_RECORD);
    if (timeout == ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT
        || timeout == ExecutionOptions.FLUSH_AFTER_EVERY_RECORD) {
        outputFlusher = null;
    } else {
        String threadName =
            taskName == null
            ? DEFAULT_OUTPUT_FLUSH_THREAD_NAME
            : DEFAULT_OUTPUT_FLUSH_THREAD_NAME + " for " + taskName;

        outputFlusher = new OutputFlusher(threadName, timeout);
        outputFlusher.start();
    }
}

OutputFlusher使用专门的线程,异步定时调用targetPartitionflushAll()方法。调用时间间隔就是setBufferTimeout的值。

@Override
public void run() {
    try {
        while (running) {
            try {
                // 每隔timeout这么长时间,就flush所有的数据
                Thread.sleep(timeout);
            } catch (InterruptedException e) {
                // propagate this if we are still running, because it should not happen
                // in that case
                if (running) {
                    throw new Exception(e);
                }
            }

            // any errors here should let the thread come to a halt and be
            // recognized by the writer
            flushAll();
        }
    } catch (Throwable t) {
        notifyFlusherException(t);
    }
}

到此为止我们分析完了buffer timeout从配置到生成StreamGraph到如何影响Flink发送数据的完整过程。

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354

推荐阅读更多精彩内容