window的使用场景
聚合统计、数据合并(积攒批)、双流join
Window Assigner
Window Assigner:WindowAssigner 负责将每条输入的数据分发到正确的 window 中(一条数据可能同时分发到多个 Window 中)
Evictor
Evictor:可以译为“驱逐者”。在Trigger触发之后,可以指定在窗口前后执行,相当于一个filter.Evictor 是可选的方法,如果用户不选择,则默认没有。
Evictor的evicBefore 和 evicAfter两个方法用于分别在窗口前后过滤
Trigger
Trigger:触发器。
trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的 trigger,如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自 Trigger 即可,我们详细描述下 Trigger 的接口以及含义:
* onElement() 每次往 window 增加一个元素的时候都会触发
* onEventTime() 当 event-time timer 被触发的时候会调用
* onProcessingTime() 当 processing-time timer 被触发的时候会调用
* onMerge() 对两个 trigger 的 state 进行 merge 操作
* clear() window 销毁的时候被调用
上面的接口中前三个会返回一个 TriggerResult,TriggerResult 有如下几种可能的选择:
* CONTINUE 不做任何事情
* FIRE 触发 window
* PURGE 清空整个 window 的元素并销毁窗口
* FIRE_AND_PURGE 触发窗口,然后销毁窗口
Trigger的场景使用
场景1:ContinuousEventTimeTrigger
短窗口的计算由于其窗口期较短,那么很快就能获取到结果,但是对于长窗口来说窗口时间比较长,如果等窗口期结束才能看到结果,那么这份数据就不具备实时性,大多数情况我们希望能够看到一个长窗口的结果不断变动的情况,对此Flink提供了ContinuousEventTimeTrigger连续事件时间触发器与ContinuousProcessingTimeTrigger连续处理时间触发器,指定一个固定时间间隔interval,不需要等到窗口结束才能获取结果,能够在固定的interval获取到窗口的中间结果。
场景2:ContinuousEventTimeTrigger+PurgingTrigger
可以清除掉之前已经计算过的窗口数据,让window的间隔interval独立输出,最终在外部数据源(例如mysql)对一个窗口的数据汇总,方便使用
window Function
1.AggregateFunction 增量聚合(来一条聚合一条),减少state大小
2.AggregateFunction 数据先存在state中,等到trigger 的时候,才通过state计算.可以输出key 、window等额外信息
3.可以使用将AggregateFunction和AggregateFunction,一方面增量聚合,一方面补全信息
window的触发
window的触发要同时满足以下2个条件:
watermark时间 >= window_end_time
在[window_start_time,window_end_time)中有数据存在
注意:watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加
实现机制
//如果新加入的timer触发时间早于下一次的触发时间,那么应该重新设置下一次触发时间,注意processingTimeTimersQueue 是一个小顶堆,时间早的在堆顶
//注册下一次的触发任务
定时器
TimerService 在内部维护两种类型的定时器(处理时间和事件时间定时器)并排队执行。
TimerService 会删除每个键和时间戳重复的定时器,即每个键在每个时间戳上最多有一个定时器。如果为同一时间戳注册了多个定时器,则只会调用一次onTimer()方法。
Flink同步调用onTimer()和processElement()方法。因此,用户不必担心状态的并发修改。
容错
定时器具有容错能力,并且与应用程序的状态一起进行快照。如果故障恢复或从保存点启动应用程序,就会恢复定时器。
在故障恢复之前应该触发的处理时间定时器会被立即触发。当应用程序从故障中恢复或从保存点启动时,可能会发生这种情况。
定时器合并
由于 Flink 仅为每个键和时间戳维护一个定时器,因此可以通过降低定时器的频率来进行合并以减少定时器的数量。
对于频率为1秒的定时器(事件时间或处理时间),我们可以将目标时间向下舍入为整秒数。定时器最多提前1秒触发,但不会迟于我们的要求,精确到毫秒。因此,每个键每秒最多有一个定时器。
Java版本:
long coalescedTime=((ctx.timestamp()+timeout)/1000)*1000;ctx.timerService().registerProcessingTimeTimer(coalescedTime);
Scala版本:
val coalescedTime=((ctx.timestamp+timeout)/1000)*1000ctx.timerService.registerProcessingTimeTimer(coalescedTime)
由于事件时间定时器仅当 Watermark 到达时才会触发,因此我们可以将当前 Watermark 与下一个 Watermark 的定时器一起调度和合并:
Java版本:
long coalescedTime=ctx.timerService().currentWatermark()+1;ctx.timerService().registerEventTimeTimer(coalescedTime);
Scala版本:
val coalescedTime=ctx.timerService.currentWatermark+1ctx.timerService.registerEventTimeTimer(coalescedTime)
可以使用一下方式停止一个处理时间定时器:
Java版本:
long timestampOfTimerToStop=...ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
Scala版本:
val timestampOfTimerToStop=...ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
可以使用一下方式停止一个事件时间定时器:
Java版本:
long timestampOfTimerToStop=...ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
Scala版本:
val timestampOfTimerToStop=...ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
如果没有给指定时间戳注册定时器,那么停止定时器没有任何效果。
Flink版本:1.8
参考
https://ververica.cn/developers/time-window/
https://blog.csdn.net/qq_31866793/article/details/103138715
https://blog.jrwang.me/2019/flink-source-code-time-and-window/
https://www.jiangyuesong.me/2019/12/14/flink-window-and-timer/