Flink Time-windowed Joins过期数据清理机制分析

在flink双流Time-windowed Joins的主要实现是在TimeBoundedStreamJoin中,这个类里面的变量非常的多,所以首先要清楚,这些重要变量或者概念的计算过程。简单的说整个join过程就是把左流的数据和右流的数据都通过state保存起来,左流有新的数据到,就会根据key去遍历右流state中的数据,符合关联条件就输出,关联不上的就保存在左流的state中等待右流数据的遍历,反之亦然。另外会对每个流计算过期时间,以及每个数据的清理时间。本文主要根据代码的实现过程对清理机制做一个分步的演算。

基本公式

建表语句

CREATE TABLE LeftTable (
  l_id STRING, 
  l_imsi STRING, 
  l_time TIMESTAMP(3), 
  WATERMARK FOR l_time AS l_time - INTERVAL '5' SECOND 
) WITH ( 
  'connector.type' = 'kafka',   
  'connector.version' = 'universal',  
  'connector.topic' = 'foo',  
  'connector.properties.zookeeper.connect' = 'hostA:2181',
  'connector.properties.bootstrap.servers' = 'hostA:6667',  
  'connector.properties.group.id' = 'tg1',   
  'connector.startup-mode' = 'latest-offset', 
  'format.type' = 'csv',   
  'format.field-delimiter' = ','
  )
CREATE TABLE RightTable (
  r_id STRING, 
  r_location STRING, 
  r_time TIMESTAMP(3), 
  WATERMARK FOR r_time AS r_time - INTERVAL '2' SECOND 
) WITH (  
  'connector.type' = 'kafka',   
  'connector.version' = 'universal',  
  'connector.topic' = 'bar',  
  'connector.properties.zookeeper.connect' = 'hostA:2181',
  'connector.properties.bootstrap.servers' = 'hostA:6667',
  'connector.properties.group.id' = 'tg1',   
  'connector.startup-mode' = 'latest-offset', 
  'format.type' = 'csv',   
  'format.field-delimiter' = ',' 
)

执行SQL

SELECT l_id, l_imsi, r_location 
FROM LeftTable 
LEFT JOIN RightTable 
on l_id = r_id 
and r_time >= l_time - INTERVAL '4' SECOND AND r_time <= l_time + INTERVAL '6' SECOND

upperBound = 6lowerNound = -4

那么

leftRelativeSize = -leftLowerBound = -(-upperBound) = upperBound = 6

rightRelativeSize = leftUpperBound = -lowerBound = 4

leftExpirationTime = wm - upperBound - 0.001 
                   = wm - 6 - 0.001 
                   = wm - 6.001

rightExpirationTime = wm + lowerBound - 0.001 
                    = wm - 4 - 0.001 
                    = wm - 4.001
  
leftRowCleanUpTime = rowTime + leftRelativeSize + minCleanUpInterval + allowedLateness + 0.001 
                   = rowTime + 6 + (6+4)/2 + 0.001 
                   = rowTime + 11.001
                   
rightRowCleanUpTime = rowTime + rightRelativeSize + minCleanUpInterval + allowedLateness + 0.001 
                    = rowTime + 4 + (6+4)/2 + 0.001 
                    = rowTime + 9.001

rightOperatorTime = leftOperatorTime = wm = min(leftWatermark, rightWatermark)
  
allowedLateness = 0 //忽略不计

根据上面的公式进行模拟计算

顺序 来源 数据 wm ExpirationTime RowCleanUpTime 结果
1 Left 1,111,2020-01-01 10:10:16 0 left=0, right=-4001 10:10:27.001
2 Right 2,B,2020-01-01 10:10:20 10:10:11 left=-6001, right=-4001 10:10:29.001
3 Left 2,222,2020-01-01 10:10:22 10:10:17 left=-6001, right=10:10:06.999 10:10:33.001 join输出2,222,B
4 Left 4,4444,2020-01-01 10:10:35 10:10:18 left=-6001, right=10:10:12.999 10:10:46.001
5 Right 4,D,2020-01-01 10:10:29 10:10:27 left=10:10:11.999, right=10:10:12.999 10:10:38.001
6-1 Right 5,E,2020-01-01 10:10:30 10:10:28 left=10:10:20.999, right=10:10:12.999 10:10:39.001
wm超过第一条数据的CleanUpTime,触发定时器 10:10:28 left=10:10:21.999, right=10:10:12.999 删除第一条数据,因为是left join所以输出1,111,
6-2 Right 1,A,2020-01-01 10:10:17 10:10:27 left=10:10:20.999, right=10:10:12.999 可以和第一条数据join上,所以输出1,111,A但是第一条数据的时间戳已经小于leftExpirationTime说明已经过期,同时在缓存中删除第一条数据,但是这时wm并没有超过第一条数据的CleanUpTime,不会触发清理的定时器
6-3 Right 1,A,2020-01-01 10:10:30 10:10:28 left=10:10:20.999, right=10:10:12.999 不能和第一条数据join上,所以输出1,111,但是第一条数据的时间戳已经小于leftExpirationTime说明已经过期,同时在缓存中删除第一条数据
wm超过第一条数据的CleanUpTime,触发定时器 10:10:28 left=10:10:21.999, right=10:10:12.999 第一条数据已经删除,定时器不需要做其他操作

总结

每次计算ExpirationTime的时候用的是上一次的wm;
6-1,6-2,6-3为三种独立可能触发删除过期数据的场景;
通过上面的测试可以发现,数据即便过期了,但是没有到清理时间,如果这时候有符合关联条件的数据还是可以关联上的,例如6-2场景。

以上数据基于flink 1.10.0版本blink planner进行测试。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容