Flink基础系列34-Flink CEP简介

一.什么是CEP

  1. 复杂事件处理(Complex Event Processing,CEP)

  2. Flink CEP是在Flink中实现的复杂事件处理(CEP)库

  3. CEP允许在无休止的事件流中检测事件模式,让我们有机会掌握数据中重要的部分

  4. 一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据——满足规则的复杂事件

二.CEP特点

image.png

目标:从有序的简单事件流中发现一些高阶特征

输入:一个或多个由简单事件构成的事件流

处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件

输出:满足规则的复杂事件

三. Pattern API

处理事件的规则,被叫做"模式"(Pattern)

Flink CEP提供了Pattern API,用于对输入流数据进行复杂事件规则定义,用来提取符合规则的时间序列

DataStream<Event> input = ...
// 定义一个Pattern
Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(...)
  .next("middle").subtype(SubEvent.class).where(...)
  .followedBy("end").where(...);
// 将创建好的Pattern应用到输入事件流上
PatternStream<Event> patternStream = CEP.pattern(input,pattern);
// 检出匹配事件序列,处理得到结果
DataStream<Alert> result = patternStream.select(...);

3.1 个体模式(Individual Patterns)

组成复杂规则的每一个单独的模式定义,就是"个体模式"

start.times(3).where(new SimpleCondition<Event>() {...})

个体模式可以包括"单例(singleton)模式"和"循环(looping)模式"

单例模式只接收一个事件,而循环模式可以接收多个

量词(Quantifier)
可以在一个个体模式后追加量词,也就是指定循环次数

//匹配出现4次
start.times(4)
//匹配出现2/3/4次
start.time(2,4).greedy
//匹配出现0或者4次
start.times(4).optional
//匹配出现1次或者多次
start.oneOrMore
//匹配出现2,3,4次
start.times(2,4)
//匹配出现0次,2次或者多次,并且尽可能多的重复匹配
start.timesOrMore(2),optional.greedy

条件(Condition)

  1. 每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据
  2. CEP中的个体模式主要通过调用.where(),.or()和.until()来指定条件
  3. 按不同的调用方式,可以分成以下几类
    1)简单条件(Simple Condition)
    通过.where()方法对事件中的字段进行判断筛选,决定是否接受该事件
start.where(new SimpleCondition<Event>){
  @Override
  public boolean filter(Event value) throws Exception{
    return value.getName.startsWith("foo");
  }
}

2)组合条件(Combining Condition)
将简单条件进行合并;.or()方法表示或逻辑相连,where的直接组合就是AND

pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)

3)终止条件(Stop Condition)
如果使用了oneOrMore或者oneOrMore.optional,建议使用.until()作为终止条件,以便清理状态

4)迭代条件(Iterative Condition)
能够对模式之前所有接收的事件进行处理
可以调用ctx.getEventsForPattern("name")

.where(new IterativeCondition<Event>(){...})

3.2 组合模式(Combining Patterns)

组合模式(Combining Patterns)也叫模式序列。
1)很多个体模式组合起来,就形成了整个的模式序列
2)模式序列必须以一个"初始模式"开始

Pattern<Event, Event> start = Pattern.<Event>begin("start")
image.png
  1. 严格近邻(Strict Contiguity)
    1)所有事件按照严格的顺序出现,中间没有任何不匹配的事件,由.next()指定
    2)例如对于模式"a next b",事件序列[a,c,b1,b2]没有匹配

  2. 宽松近邻(Relaxed Contiguity)
    1)允许中间出现不匹配的事件,由.followedBy()指定
    2)例如对于模式"a followedBy b",事件序列[a,c,b1,b2]匹配为[a,b1]

  3. 非确定性宽松近邻(Non-Deterministic Relaxed Contiguity)
    1)进一步放宽条件,之前已经匹配过的事件也可以再次使用,由.followByAny()指定
    2)例如对于模式"a followedAny b",事件序列[a,c,b1,b2]匹配为{a,b1},{a,b2}

  4. 除了以上模式序列外,还可以定义"不希望出现某种近邻关系":
    1).notNext() 不严格近邻
    2).notFollowedBy()不在两个事件之间发生
    (eg,a not FollowedBy c,a Followed By b,a希望之后出现b,且不希望ab之间出现c)

  5. 需要注意:
    1)所有模式序列必须以.begin()开始
    2)模式序列不能以.notFollowedBy()结束
    3)"not "类型的模式不能被optional 所修饰
    4)此外,还可以为模式指定事件约束,用来要求在多长时间内匹配有效:
    next.within(Time.seconds(10))

3.3 模式组

3.3.1 模式的检测

指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配
调用CEP.pattern(),给定输入流和模式,就能得到一个PatternStream

DataStream<Event> input = ...
Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(...)...

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

3.3.2 匹配事件提取

  1. 创建PatternStrean之后,就可以应用select或者flatselect方法,从检测到的事件序列中提取事件了
  2. select()方法需要输入一个select function作为参数,每个成功匹配的事件序列都会调用它
  3. select() 以一个Map<String,List<IN]>> 来接收匹配到的事件序列,其中Key就是每个模式的名称,而value就是所有接收到的事件的List类型
public OUT select(Map<String, List<IN>> pattern) throws Exception {
  IN startEvent = pattern.get("start").get(0);
  IN endEvent = pattern.get("end").get(0);
  return new OUT(startEvent, endEvent);
}

3.3.3 超时事件提取

当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃;为了能够处理这些超时的部分匹配,select和flatSelect API调用允许指定超时处理程序

超时处理程序会接收到目前为止由模式匹配到的所有事件,由一个OutputTag定义接收到的超时事件序列

PatternStream<Event> patternStream = CEP.pattern(input, pattern);
OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<ComplexEvent> flatResult = 
  patternStream.flatSelect(
  outputTag,
  new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
  new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
);
DataStream<TimeoutEvent> timeoutFlatResult = 
  flatResult.getSideOutput(outputTag);

参考:

  1. https://www.bilibili.com/video/BV1qy4y1q728
  2. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_15-cep
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,997评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,603评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,359评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,309评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,346评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,258评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,122评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,970评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,403评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,596评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,769评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,464评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,075评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,705评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,848评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,831评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,678评论 2 354

推荐阅读更多精彩内容