Flink timer注册与watermark触发[转载自网易云音乐实时计算平台经典实践知乎专栏]

经作者magina本人同意,转载自原文
在flink中无论是windowOperator还是KeyedProcessOperator都持有InternalTimerService具体实现的对象,通过这个对象用户可以注册EventTime及ProcessTime的timer,当watermark 越过这些timer的时候,调用回调函数执行一定的操作。这里着重看下KeyedProcessOperator(windowOperator机制大致相同,这里就不再细说)。

当StreamTask 被调度执行的时候,具体生命周期如
*  -- invoke()
*        |
*        +----> Create basic utils (config, etc) and load the chain of operators
*        +----> operators.setup()
*        +----> task specific init()
*        +----> initialize-operator-states()
*        +----> open-operators()
*        +----> run()
*        +----> close-operators()
*        +----> dispose-operators()
*        +----> common cleanup
*        +----> task specific cleanup()

在KeyedProcessOperator的open方法将在streamTask open-operators()阶段被调用

@Override
public void open() throws Exception {
   super.open();
   collector = new TimestampedCollector<>(output);
//为该Operator构造InternalTimerService并启动,通过该InternalTimerService可以访问时间
   InternalTimerService<VoidNamespace> internalTimerService =
         getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);

   TimerService timerService = new SimpleTimerService(internalTimerService);

   context = new ContextImpl(userFunction, timerService);
   onTimerContext = new OnTimerContextImpl(userFunction, timerService);
}

然后stream task调用run()启动计算

@Override
protected void run() throws Exception {
   // cache processor reference on the stack, to make the code more JIT friendly
//在run方法中通过inputProcessor来从input gate里面读取消息,消息可以是正常的数据,也可以是watermark
   final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

   while (running && inputProcessor.processInput()) {
      // all the work happens in the "processInput" method
   }

   // the input is finished, notify non-head operators
   if (running) {
      synchronized (getCheckpointLock()) {
         OneInputStreamOperator<IN, OUT> headOperator = getHeadOperator();
         for (StreamOperator<?> operator : operatorChain.getAllOperatorsTopologySorted()) {
            if (operator.getOperatorID().equals(headOperator.getOperatorID())) {
               continue;
            }

            Preconditions.checkState(operator instanceof OneInputStreamOperator);
            ((OneInputStreamOperator<?, ?>) operator).endInput();
         }
      }
   }
}

在StreamInputProcessor的processInput()方法中

 else {
      // now we can do the actual processing
      StreamRecord<IN> record = recordOrMark.asRecord();
      synchronized (lock) {
         numRecordsIn.inc();
         streamOperator.setKeyContextElement1(record);
//正常数据处理,最终会调用用户实现的userfunction的processElement,对于KeyedProcessOperator就是调用用户定义keyedProcessFunction的processElement
         streamOperator.processElement(record);
      }
   }

   return true;
} else if (recordOrMark.isWatermark()) {
   // handle watermark
   statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);//处理watermark
   continue;
} else if (recordOrMark.isStreamStatus()) {

下面就看下watermark的处理过程,最终会调用到AbstractStreamOperator的processWatermark方法

public void processWatermark(Watermark mark) throws Exception {
 if (timeServiceManager != null) {
      timeServiceManager.advanceWatermark(mark);//第一步处理watermark
   }
   output.emitWatermark(mark);//第二步,将watermark发送到下游
}

那么是怎么处理watermark的呢?接着看InternalTimeServiceManager的advanceWatermark方法

public void advanceWatermark(Watermark watermark) throws Exception {
//这里之前调用getInternalTimerService构建的的InternalTimerService都要处理该watermark
   for (HeapInternalTimerService<?, ?> service : timerServices.values()) {
      service.advanceWatermark(watermark.getTimestamp());
   }
}

接着看HeapInternalTimerService我们可以发现,这里逻辑timer时间小于watermark的都应该被触发回调,

public void advanceWatermark(long time) throws Exception {
   currentWatermark = time;//更新当前watermark

   InternalTimer<K, N> timer;
 //取出所有低于watermark的timer触发回调。
   while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {

      Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
      timerSet.remove(timer);
      eventTimeTimersQueue.remove();

      keyContext.setCurrentKey(timer.getKey());
      triggerTarget.onEventTime(timer);//这里的triggerTarget就是具体的operator对象,
   }
}

这里triggerTarget就是具体的operator实例,在open的时候通过InternalTimeServiceManager的getInternalTimerService方法传递到HeapInternalTimerService

image
image

接着看KeyedProcessOperator的onEeventTime,这里就是调用用户实现的keyedProcessFunction的onTimer去做一些具体的事情。对于window来说也是调用onEventTime或者onProcessTime来从key和window對應的状态中的数据发送到windowFunction中去计算并发送到下游节点

@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
   collector.setAbsoluteTimestamp(timer.getTimestamp());
   invokeUserFunction(TimeDomain.EVENT_TIME, timer);
}
private void invokeUserFunction(
      TimeDomain timeDomain,
      InternalTimer<K, VoidNamespace> timer) throws Exception {
   onTimerContext.timeDomain = timeDomain;
   onTimerContext.timer = timer;
   userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);//这里就是前面用户实现的onTimer方法
   onTimerContext.timeDomain = null;
   onTimerContext.timer = null;
}

前面讲的是watermark是怎么被触发的,但是还有另外一个问题,timer是如何注册的。

windowOperator和KeyedProcessOperator直接或者间接持有timerService,通过timerService对象就可以注册相应的timer

/**
 * Interface for working with time and timers.
 */
@PublicEvolving
public interface TimerService {

   /** Returns the current processing time. */
   long currentProcessingTime();

   /** Returns the current event-time watermark. */
   long currentWatermark();

   /**
    * Registers a timer to be fired when processing time passes the given time.
    *
    * <p>Timers can internally be scoped to keys and/or windows. When you set a timer
    * in a keyed context, such as in an operation on
    * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context
    * will also be active when you receive the timer notification.
    */
   void registerProcessingTimeTimer(long time);

   /**
    * Registers a timer to be fired when the event time watermark passes the given time.
    *
    * <p>Timers can internally be scoped to keys and/or windows. When you set a timer
    * in a keyed context, such as in an operation on
    * {@link org.apache.flink.streaming.api.datastream.KeyedStream} then that context
    * will also be active when you receive the timer notification.
    */
   void registerEventTimeTimer(long time);
}

对于KeyedProcessOperator来说会将timeService对象间接的传递到keyedProcessFunction,使用户在函数层面就能注册和访问timer。这里需要注意的有两点

1.namespace相同的情况下,每一个key只有1个timer。

2.如果TimeCharacteristic为processTime,当需要注册timer时间小于当前系统处理时间会立即出发回调

image
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容