从AggregateFunction.merge()到Flink会话窗口实现原理

前言

在我们使用Flink DataStream API编写业务代码时,aggregate()算子和AggregateFunction无疑是非常常用的。编写一个AggregateFunction需要实现4个方法:

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
    ACC createAccumulator();

    ACC add(IN value, ACC accumulator);

    OUT getResult(ACC accumulator);

    ACC merge(ACC a, ACC b);
}

前三个方法都很容易理解,但第四个merge()方法就有些令人费解了:到底什么时候需要合并两个累加器的数据呢?最近也有童鞋问到了这个问题。实际上,这个方法是专门为会话窗口(session window)服务的。下面来解析一下会话窗口。

Session Window & MergingWindowAssigner

stream.keyBy("userId").window(EventTimeSessionWindows.withGap(Time.seconds(gap)))

在普通的翻滚窗口和滑动窗口中,窗口的范围是按时间区间固定的,虽然范围有可能重合,但是处理起来是各自独立的,并不会相互影响。但是会话窗口则不同,其范围是根据事件之间的时间差是否超过gap来确定的(超过gap就形成一个新窗口),也就是说并非固定。所以,我们需要在每个事件进入会话窗口算子时就为它分配一个初始窗口,起点是它本身所携带的时间戳(这里按event time处理),终点则是时间戳加上gap的偏移量。这样的话,如果两个事件所在的初始窗口没有相交,说明它们属于不同的会话;如果相交,则说明它们属于同一个会话,并且要把这两个初始窗口合并在一起,作为新的会话窗口。多个事件则依次类推,最终形成上面图示的情况。

为了支持会话窗口的合并,它们的WindowAssigner也有所不同,称为MergingWindowAssigner,如下类图所示。

MergingWindowAssigner是一个抽象类,代码很简单,定义了用于合并窗口的mergeWindows()方法以及合并窗口时的回调MergeCallback。

public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
    private static final long serialVersionUID = 1L;

    public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);

    public interface MergeCallback<W> {
        void merge(Collection<W> toBeMerged, W mergeResult);
    }
}

所有MergingWindowAssigner实现类的mergeWindows()方法都是相同的,即直接调用TimeWindow.mergeWindows()方法,其源码如下。

public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c)
    // sort the windows by the start time and then merge overlapping windows
    List<TimeWindow> sortedWindows = new ArrayList<>(windows);
    Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
        @Override
        public int compare(TimeWindow o1, TimeWindow o2) {
            return Long.compare(o1.getStart(), o2.getStart());
        }
    });

    List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
    Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;

    for (TimeWindow candidate: sortedWindows) {
        if (currentMerge == null) {
            currentMerge = new Tuple2<>();
            currentMerge.f0 = candidate;
            currentMerge.f1 = new HashSet<>();
            currentMerge.f1.add(candidate);
        } else if (currentMerge.f0.intersects(candidate)) {
            currentMerge.f0 = currentMerge.f0.cover(candidate);
            currentMerge.f1.add(candidate);
        } else {
            merged.add(currentMerge);
            currentMerge = new Tuple2<>();
            currentMerge.f0 = candidate;
            currentMerge.f1 = new HashSet<>();
            currentMerge.f1.add(candidate);
        }
    }

    if (currentMerge != null) {
        merged.add(currentMerge);
    }

    for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
        if (m.f1.size() > 1) {
            c.merge(m.f1, m.f0);
        }
    }
}

// TimeWindow.intersects()
public boolean intersects(TimeWindow other) {
    return this.start <= other.end && this.end >= other.start;
}

// TimeWindow.cover()
public TimeWindow cover(TimeWindow other) {
    return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
}

该方法将所有待合并的窗口按照起始时间升序排序,遍历排序好的窗口,并调用intersects()方法判断它们是否相交。如果相交,则调用cover()方法合并返回一个覆盖两个窗口的窗口;如果不相交,则启动下一次合并过程。列表merged中存储的就是[合并结果, 原窗口集合]的二元组,如果原窗口集合的大小大于1,说明发生了合并,需要调用回调方法MergeCallback.merge()。

就这么简单吗?当然不是。上面的逻辑只是在时域的角度合并了窗口,但是别忘了,窗口是需要维护状态和触发器的,所以它们也得被合并才能保证不出错。下面就来介绍跟踪窗口状态合并的MergingWindowSet组件。

MergingWindowSet

MergingWindowSet的思路很直接:既然状态的创建和维护是比较重的操作,那么就在一批窗口合并时,以其中一个窗口的状态为基准,其他窗口的状态都直接合并到这个基准窗口的状态里来,称为“状态窗口”。这样就避免了创建新的状态实例,只需要维护合并的窗口与状态窗口之间的映射关系,以及保证映射关系的容错(通过ListState)即可。

// Mapping from window to the window that keeps the window state...
private final Map<W, W> mapping;

// Mapping when we created the MergingWindowSet...
private final Map<W, W> initialMapping;

private final ListState<Tuple2<W, W>> state;

public W getStateWindow(W window) {
    return mapping.get(window);
}

public void persist() throws Exception {
    if (!mapping.equals(initialMapping)) {
        state.clear();
        for (Map.Entry<W, W> window : mapping.entrySet()) {
            state.add(new Tuple2<>(window.getKey(), window.getValue()));
        }
    }
}

MergingWindowSet的核心逻辑位于add()方法中。该方法输入一个新窗口,并试图将其时域和状态进行合并,代码如下。

public W addWindow(W newWindow, MergeFunction<W> mergeFunction) throws Exception {
    List<W> windows = new ArrayList<>();
    windows.addAll(this.mapping.keySet());
    windows.add(newWindow);

    final Map<W, Collection<W>> mergeResults = new HashMap<>();
    windowAssigner.mergeWindows(windows,
            new MergingWindowAssigner.MergeCallback<W>() {
                @Override
                public void merge(Collection<W> toBeMerged, W mergeResult) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Merging {} into {}", toBeMerged, mergeResult);
                    }
                    mergeResults.put(mergeResult, toBeMerged);
                }
            });

    W resultWindow = newWindow;
    boolean mergedNewWindow = false;

    // perform the merge
    for (Map.Entry<W, Collection<W>> c: mergeResults.entrySet()) {
        W mergeResult = c.getKey();
        Collection<W> mergedWindows = c.getValue();
        // if our new window is in the merged windows make the merge result the
        // result window
        if (mergedWindows.remove(newWindow)) {
            mergedNewWindow = true;
            resultWindow = mergeResult;
        }
        // pick any of the merged windows and choose that window's state window
        // as the state window for the merge result
        W mergedStateWindow = this.mapping.get(mergedWindows.iterator().next());
        // figure out the state windows that we are merging
        List<W> mergedStateWindows = new ArrayList<>();
        for (W mergedWindow: mergedWindows) {
            W res = this.mapping.remove(mergedWindow);
            if (res != null) {
                mergedStateWindows.add(res);
            }
        }
        this.mapping.put(mergeResult, mergedStateWindow);
        // don't put the target state window into the merged windows
        mergedStateWindows.remove(mergedStateWindow);
        // don't merge the new window itself, it never had any state associated with it
        // i.e. if we are only merging one pre-existing window into itself
        // without extending the pre-existing window
        if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
            mergeFunction.merge(mergeResult,
                    mergedWindows,
                    this.mapping.get(mergeResult),
                    mergedStateWindows);
        }
    }

    // the new window created a new, self-contained window without merging
    if (mergeResults.isEmpty() || (resultWindow.equals(newWindow) && !mergedNewWindow)) {
        this.mapping.put(resultWindow, resultWindow);
    }
    return resultWindow;
}

前面调用MergingWindowAssigner.mergeWindows()方法的逻辑已经提过了,重点在后面状态的合并。注释写得比较详细,多读几遍即可理解,不再多废话,只解释一下四个关键变量的含义:

  • mergeResult:窗口的时域合并结果;
  • mergedWindows:本次被合并的窗口集合;
  • mergedStateWindow:将要合并的状态窗口结果(目前就是基准的状态窗口);
  • mergedStateWindows:本次被合并的状态窗口集合。

最后,回调MergeFunction.merge()方法进行正式的合并。MergeFunction的定义同样位于MergingWindowSet中,其merge()方法的参数与上面四个变量是一一对应的。

public interface MergeFunction<W> {
    /**
     * This gets called when a merge occurs.
     *
     * @param mergeResult The newly resulting merged {@code Window}.
     * @param mergedWindows The merged {@code Window Windows}.
     * @param stateWindowResult The state window of the merge result.
     * @param mergedStateWindows The merged state windows.
     * @throws Exception
     */
    void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception;
}

接下来去到窗口算子WindowOperator中,完成最后一步。

Window Merging in WindowOperator

以下是WindowOperator.processElement()方法中,处理MergingWindowAssigner的部分。

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
    final Collection<W> elementWindows = windowAssigner.assignWindows(
        element.getValue(), element.getTimestamp(), windowAssignerContext);

    //if element is handled by none of assigned elementWindows
    boolean isSkippedElement = true;

    final K key = this.<K>getKeyedStateBackend().getCurrentKey();

    if (windowAssigner instanceof MergingWindowAssigner) {
        MergingWindowSet<W> mergingWindows = getMergingWindowSet();

        for (W window: elementWindows) {
            // adding the new window might result in a merge, in that case the actualWindow
            // is the merged window and we work with that. If we don't merge then
            // actualWindow == window
            W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
                @Override
                public void merge(W mergeResult,
                        Collection<W> mergedWindows, W stateWindowResult,
                        Collection<W> mergedStateWindows) throws Exception {
                    if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
                        // ...
                    } else if (!windowAssigner.isEventTime()) {
                        // ...
                    }

                    triggerContext.key = key;
                    triggerContext.window = mergeResult;
                    triggerContext.onMerge(mergedWindows);

                    for (W m: mergedWindows) {
                        triggerContext.window = m;
                        triggerContext.clear();
                        deleteCleanupTimer(m);
                    }

                    // merge the merged state windows into the newly resulting state window
                    windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                }
            });

            // drop if the window is already late
            if (isWindowLate(actualWindow)) {
                mergingWindows.retireWindow(actualWindow);
                continue;
            }
            isSkippedElement = false;

            W stateWindow = mergingWindows.getStateWindow(actualWindow);
            if (stateWindow == null) {
                throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
            }
            windowState.setCurrentNamespace(stateWindow);
            windowState.add(element.getValue());

            triggerContext.key = key;
            triggerContext.window = actualWindow;
            TriggerResult triggerResult = triggerContext.onElement(element);

            if (triggerResult.isFire()) {
                ACC contents = windowState.get();
                if (contents == null) {
                    continue;
                }
                emitWindowContents(actualWindow, contents);
            }
            if (triggerResult.isPurge()) {
                windowState.clear();
            }

            registerCleanupTimer(actualWindow);
        }
        // need to make sure to update the merging state in state
        mergingWindows.persist();
    } else {
        // ......
    }
    // ......
}

该方法先获得输入的流元素分配到的窗口,然后调用MergingWindowSet.addWindow()方法试图合并它。特别注意MergeFunction.merge()方法,它做了如下两件事:

  • 调用TriggerContext.onMerge()方法,更新触发器注册的定时器时间,然后遍历所有被合并的原始窗口,调用TriggerContext.clear()方法删除它们的触发器,保证合并后的窗口能够被正确地触发;
  • 调用InternalMergingState.mergeNamespaces()方法,将待合并窗口的状态与基准窗口的状态合并,产生的stateWindowResult就是状态窗口。

addWindow()方法返回的actualWindow就是合并之后的真正窗口,然后再根据MergingWindowSet中维护的映射关系,取出它所对应的状态窗口,并将输入元素加入状态窗口中。最后,根据更新后的触发器逻辑判断窗口需要fire还是purge,并触发执行相应的操作。整个窗口合并的流程就完成了。

Back on AggregateFunction

前面说了这么多,AggregateFunction.merge()方法到底在哪里呢?注意上一节中出现的用于合并窗口状态的InternalMergingState.mergeNamespaces()方法,InternalMergingState是Flink状态体系中所有能够合并的状态的基类。下面观察它的两个实现类:

  • 基于堆的AbstractHeapMergingState→HeapAggregatingState
@Override
public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
    if (sources == null || sources.isEmpty()) {
        return; // nothing to do
    }
    final StateTable<K, N, SV> map = stateTable;
    SV merged = null;
    // merge the sources
    for (N source : sources) {
        // get and remove the next source per namespace/key
        SV sourceState = map.removeAndGetOld(source);
        if (merged != null && sourceState != null) {
            merged = mergeState(merged, sourceState);  // <----
        } else if (merged == null) {
            merged = sourceState;
        }
    }
    // merge into the target, if needed
    if (merged != null) {
        map.transform(target, merged, mergeTransformation);
    }
}

@Override
protected ACC mergeState(ACC a, ACC b) {
    return aggregateTransformation.aggFunction.merge(a, b);  // <----
}
  • 基于RocksDB的RocksDBAggregatingState
@Override
public void mergeNamespaces(N target, Collection<N> sources) {
    if (sources == null || sources.isEmpty()) {
        return;
    }
    try {
        ACC current = null;
        // merge the sources to the target
        for (N source : sources) {
            if (source != null) {
                setCurrentNamespace(source);
                final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
                final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
                backend.db.delete(columnFamily, writeOptions, sourceKey);
                if (valueBytes != null) {
                    dataInputView.setBuffer(valueBytes);
                    ACC value = valueSerializer.deserialize(dataInputView);
                    if (current != null) {
                        current = aggFunction.merge(current, value);  // <----
                    }
                    else {
                        current = value;
                    }
                }
            }
        }
        // ......
    }
    catch (Exception e) {
        throw new FlinkRuntimeException("Error while merging state in RocksDB", e);
    }
}

可见,累加器其实就是AggregateFunction维护的状态。当AggregateFunction与会话窗口一同使用来实现增量聚合时,就会调用用户实现的merge()方法来合并累加器中的数据了。也就是说,如果我们没有使用会话窗口,那么不实现merge()方法同样没问题。

The End

本篇贴了太多代码,逻辑也比较绕,希望能讲清楚吧。

民那晚安晚安。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,561评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,218评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,162评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,470评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,550评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,806评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,951评论 3 407
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,712评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,166评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,510评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,643评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,306评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,930评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,745评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,983评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,351评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,509评论 2 348