Flink(1.13) FlinkCEP

什么是FlinkCEP

FlinkCEP(Complex event processing for Flink) 是在Flink实现的复杂事件处理库. 它可以让你在无界流或有界流中检测出特定的数据,有机会掌握数据中重要的那部分。
是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤关联聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。

  1. 目标:从有序的简单事件流中发现一些高阶特征
  2. 输入:一个或多个由简单事件构成的事件流
  3. 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
  4. 输出:满足规则的复杂事件

总结:FlinkCEP 用于编写规则,实现对流数据进行处理,最终获取我们想要的结果。


Flink CEP应用场景

  • 风险控制
    对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。

  • 策略营销
    用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。

  • 运维监控
    灵活配置多指标、多依赖来实现更复杂的监控模式。


Flink CEP入门案例

官网介绍

  • 导入依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.11</artifactId>
    <version>1.13.0</version>
</dependency>
  • 给定一个流(必须指定水印)
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);

        // 读取数据封装成javaBean
        SingleOutputStreamOperator<Sensor> returns = source.flatMap((FlatMapFunction<String, Sensor>) (value, out) -> {
            out.collect(new Sensor(value.split(",")));
        }).returns(Types.POJO(Sensor.class));

        // 添加水印
        SingleOutputStreamOperator<Sensor> operator = returns.assignTimestampsAndWatermarks(
                // 最大乱序程度
                WatermarkStrategy.<Sensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((SerializableTimestampAssigner<Sensor>) (element, recordTimestamp) -> element.getTs() * 1000)
        );

为了方便,将数据封装成了对象

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Sensor {
    private Integer id;
    private Long ts;
    private Long vc;


    public Sensor(String[] args){
        this.id=Integer.valueOf(args[0]);
        this.ts=Long.valueOf(args[1]);
        this.vc=Long.valueOf(args[2]);
    }
}
  • 定义规则
// 定义模式
Pattern<Sensor, Sensor> beginPattern = Pattern.<Sensor>begin("begin")
 .where(new SimpleCondition<Sensor>() {
      @Override
      public boolean filter(Sensor value) throws Exception {
            return "sensor_1".equals(value.getId());
      }
});

Pattern 别导错了

import org.apache.flink.cep.pattern.Pattern;

begin :cep 都是从 begin开始。

 <X> Pattern<X, X> begin(final String name)

每个模式都必须有一个唯一的名称,稍后您可以使用该名称来标识匹配的事件。
模式名称不能包含字符":"。

begin("begin")
  • 将CEP 应用到流中
PatternStream<Sensor> pattern = CEP.pattern(watermarks, beginPattern);
SingleOutputStreamOperator<String> select = pattern.select((PatternSelectFunction<Sensor, String>)
// 从"begin" 名称中,获取模式里的数据
pattern1 -> pattern1.get("begin").toString());
  • 返回结果
select.print();

env.execute();
  • sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
  • 程序
    @Test
    public void test01() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.readTextFile("input/sensor.txt");

        //DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);



        // 读取数据封装成javaBean
        SingleOutputStreamOperator<Sensor> flatMapStream = source.flatMap((FlatMapFunction<String, Sensor>) (value, out) -> {
            out.collect(new Sensor(value.split(",")));
        }).returns(Types.POJO(Sensor.class));

        // 添加水印
        SingleOutputStreamOperator<Sensor> watermarks = flatMapStream.assignTimestampsAndWatermarks(
                // 最大乱序程度
                WatermarkStrategy.<Sensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((SerializableTimestampAssigner<Sensor>) (element, recordTimestamp) -> element.getTs() * 1000)
        );

        // 定义模式
        Pattern<Sensor, Sensor> beginPattern = Pattern.
                <Sensor>begin("begin")
                .where(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
        });

        // CEP
        PatternStream<Sensor> pattern = CEP.pattern(watermarks, beginPattern);
        SingleOutputStreamOperator<String> select = pattern.select((PatternSelectFunction<Sensor, String>)
                pattern1 -> pattern1.get("begin").toString());

        select.print();

        env.execute();

    }
  • 运行结果,都是 sensor_1的数据结果。
11> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
10> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
12> [Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]

模式API

单个模式

单个模式可以是单例模式或者循环模式.

单例模式

单例模式只接受一个事件. 默认情况模式都是单例模式.
前面的例子就是一个单例模式

循环模式

循环模式可以接受多个事件.
单例模式配合上量词就是循环模式.(非常类似我们熟悉的正则表达式)

  • times(n)
        // 定义模式
        Pattern<Sensor, Sensor> beginPattern = Pattern.
                <Sensor>begin("begin")
                .where(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
        }).times(2);

用于循环匹配,必须满足N条数。

匹配规则:设 time(2)

原始数据

sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60

首先匹配到 sensor_1,1,10 ,因为必须要为2,所以就会一直等着,直到遇到另一个sensor_1的数据

(sensor_1,1,10 | sensor_1,2,20)

sensor_1,1,10 匹配到之后,就不再匹配了,sensor_1,2,20继续往后匹配,直到遇到sensor_1,4,40

(sensor_1,2,20 | sensor_1,4,40)

sensor_1,4,40 同样等着,直到遇到下一个sensor_1

(sensor_1,4,40 | sensor_1,6,60)

因为流式处理,所以sensor_1sensor_1可能并不是有序的,中间可能相差好几条数据,但是匹配规则,就是满足了
time(n)的退出,否则一直等着。若程序结束都没有遇到,就自动释放(不输出,因为不满足time(n))。

运行结果

9> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
11> [Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
10> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]

  • timesOrMore(n)
    匹配规则和times(n)类似,timesOrMore(n)匹配规则则是n条及n条以上。

原始数据

sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60

匹配规则:设 timesOrMore(2)

第一次进来的是sensor_1,1,10,不满足timesOrMore的条件,所以等着。
第二次:进来sensor_1,2,20,满足timesOrMore的条件,记录起来。sensor_1,2,20需要等待下一次sensor_1的数据

(sensor_1,1,10 | sensor_1,2,20)

第三次:sensor_1,4,40sensor_1,1,10 满足, sensor_1,2,20也满足

(sensor_1,1,10 | sensor_1,2,20 | ensor_1,4,40)
(sensor_1,2,20 | ensor_1,4,40)

第四次:sensor_1,6,60sensor_1,1,10 满足, sensor_1,2,20 满足,ensor_1,4,40 满足

(sensor_1,1,10 | sensor_1,2,20 | ensor_1,4,40 | sensor_1,6,60)
(sensor_1,2,20 | ensor_1,4,40 | sensor_1,6,60)
(ensor_1,4,40 | sensor_1,6,60)

最终输出结果
多并行度,输出顺序不一致,但是整体结果和上面推断是一致的。

10> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
15> [Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
14> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
12> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
11> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
13> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
  • .times(n,m)
    或许你已经发现了timesOrMore的弊端,若数据中有10000条sensor_1,那么最后一个结果绝对有一万条数据。这样的结果,很有可能造成OOM。所以有必要的可以限定的以下次数,比如使用times(n,m),它表示一个范围,从n开始到m结束(包含m),比如 我们只要 2条,3条,4条的数据,那么就可以写成times(2,4)

原始数据

sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60

匹配规则:设 times(2,3)

第一次:sensor_1,1,10,不满足,等着
第二次:sensor_1,2,20sensor_1,1,10满足,sensor_1,2,20等着

(sensor_1,1,10,sensor_1,2,20)

第三次: sensor_1,4,40sensor_1,1,10满足,sensor_1,2,20满足,sensor_1,4,40等着

(sensor_1,1,10,sensor_1,2,20,sensor_1,4,40)
(sensor_1,2,20,sensor_1,4,40)

第四次,sensor_1,6,60, 因为上限为3,sensor_1,1,10不再往下执行,sensor_1,2,20满足,sensor_1,4,40满足,sensor_1,6,60等着,直到遇到sensor_1

(sensor_1,2,20,sensor_1,4,40,sensor_1,6,60)
(sensor_1,4,40,sensor_1,6,60)

最终输出
上限是3,所以列表长度最大为3。

1> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
3> [Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]
16> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40)]
15> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]
2> [Sensor(id=sensor_1, ts=2, vc=20), Sensor(id=sensor_1, ts=4, vc=40), Sensor(id=sensor_1, ts=6, vc=60)]

times(n,m) 与 timesOrMore(n) 的区别就是,timesOrMore(n) 没有上限。

  • oneOrMore()
    作用就是一次或多次,等价于timesOrMore(1)

条件

对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,例如前面用到的where就是一种条件

  • 迭代条件
    这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。
Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new IterativeCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value, Context<WaterSensor> ctx) throws Exception {
            return "sensor_1".equals(value.getId());
        }
    });
  • 简单条件
    这种类型的条件扩展了前面提到的IterativeCondition类,它决定是否接受一个事件只取决于事件自身的属性。
Pattern<WaterSensor, WaterSensor> pattern = Pattern
    .<WaterSensor>begin("start")
    .where(new SimpleCondition<WaterSensor>() {
        @Override
        public boolean filter(WaterSensor value) throws Exception {
            System.out.println(value);
            return "sensor_1".equals(value.getId());
        }
    });
  • 组合条件
    把多个条件结合起来使用. 这适用于任何条件,你可以通过依次调用where()来组合条件。 最终的结果是每个单一条件的结果的逻辑AND。
    如果想使用OR来组合条件,你可以像下面这样使用or()方法。

匹配 sensor_1sensor_7的数据

原始数据

sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30

程序

        // 定义模式
        Pattern<Sensor, Sensor> beginPattern = Pattern.
                <Sensor>begin("begin")
                .where(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
        }).or(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return "sensor_7".equals(value.getId());
                    }
                });

运行结果

4> [Sensor(id=sensor_7, ts=7, vc=30)]
1> [Sensor(id=sensor_1, ts=2, vc=20)]
2> [Sensor(id=sensor_1, ts=4, vc=40)]
16> [Sensor(id=sensor_1, ts=1, vc=10)]
3> [Sensor(id=sensor_1, ts=6, vc=60)]

若是要用and条件呢?CEP 并没有and(),但是指定多个where(),作用和and一样。

        // 定义模式
        Pattern<Sensor, Sensor> beginPattern = Pattern.
                <Sensor>begin("begin")
                .where(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
        }).where(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return value.getTs()<4;
                    }
                });

运行结果

14> [Sensor(id=sensor_1, ts=1, vc=10)]
15> [Sensor(id=sensor_1, ts=2, vc=20)]
  • 停止条件(until)
    如果使用循环模式(oneOrMore, timesOrMore), 可以指定一个停止条件, 否则有可能会内存吃不消.
    意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。

原始数据

sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30

当程序遇到sensor_2时就停止

        // 定义模式
        Pattern<Sensor, Sensor> beginPattern = Pattern.
                <Sensor>begin("begin")
                .where(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
        }).timesOrMore(2).until(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return "sensor_2".equals(value.getId());
                    }
                });

运行结果

8> [Sensor(id=sensor_1, ts=1, vc=10), Sensor(id=sensor_1, ts=2, vc=20)]

异常:until 只能用在循环条件。

org.apache.flink.cep.pattern.MalformedPatternException: The until condition is only applicable to looping states.

组合模式

多个单个模式组合在一起就是组合模式. 组合模式由一个初始化模式(.begin(...))开头

  • 原始数据
sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
  • 严格连续(严格紧邻)
    比如sensor_1sensor_1之间必须相邻的比如:sensor_1,1,10sensor_1,2,20sensor_1,2,20sensor_1,4,40就不行,之间还相隔着sensor_2,3,30

如何实现?使用next(),就表示严格紧邻

        // 定义模式
        Pattern<Sensor, Sensor> beginPattern = Pattern
                .<Sensor>begin("start")
                .where(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .next("next")
                .where(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                });

输出结果,只有sensor_1,1,10满足

12> [Sensor(id=sensor_1, ts=1, vc=10)]

严格模式还有一个很重要特性,就是能够解决乱序问题

sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,3,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30

试问:此时运行sensor_1,2,20是否满足?是否算得上与sensor_1,3,40为相邻?

12> [Sensor(id=sensor_1, ts=1, vc=10)]
13> [Sensor(id=sensor_1, ts=2, vc=20)]

虽然再流输入的顺序上,sensor_1,3,40是靠在sensor_2,4,30后的,但是从时间上来说,sensor_1,2,20sensor_1,3,40是有序的。内部上是有一个类似排序机制,但是在实时上,可能就不好说了,这里为了测试,用得离线的方式,所以是没有问题的。

  • 松散连续
    忽略匹配的事件之间的不匹配的事件。

原始数据

sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30

followedBy("by"):*松散连续

        // 定义模式
        Pattern<Sensor, Sensor> beginPattern = Pattern
                .<Sensor>begin("start")
                .where(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                .followedBy("by")
                .where(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                });

输出结果

9> [Sensor(id=sensor_1, ts=1, vc=10)]
10> [Sensor(id=sensor_1, ts=2, vc=20)]
11> [Sensor(id=sensor_1, ts=4, vc=40)]
  • 非确定的松散连续
    更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配
    当且仅当数据为a,c,b,b时,对于followedBy模式而言命中的为{a,b},对于followedByAny而言会有两次命中{a,b},{a,b}

原始数据

sensor_1,1,10
sensor_1,2,20
sensor_2,3,30
sensor_1,4,40
sensor_2,5,50
sensor_1,6,60
sensor_7,7,30
 // 定义模式
        Pattern<Sensor, Sensor> beginPattern = Pattern
                .<Sensor>begin("start")
                .where(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                })
                //.next("next")
                .followedByAny("by")
                .where(new SimpleCondition<Sensor>() {
                    @Override
                    public boolean filter(Sensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                });

输出结果

6> [Sensor(id=sensor_1, ts=1, vc=10)]
8> [Sensor(id=sensor_1, ts=4, vc=40)]
4> [Sensor(id=sensor_1, ts=1, vc=10)]
3> [Sensor(id=sensor_1, ts=1, vc=10)]
5> [Sensor(id=sensor_1, ts=2, vc=20)]
7> [Sensor(id=sensor_1, ts=2, vc=20)]
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 229,460评论 6 538
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 99,067评论 3 423
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 177,467评论 0 382
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,468评论 1 316
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 72,184评论 6 410
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,582评论 1 325
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,616评论 3 444
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,794评论 0 289
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,343评论 1 335
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 41,096评论 3 356
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,291评论 1 371
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 38,863评论 5 362
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,513评论 3 348
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,941评论 0 28
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 36,190评论 1 291
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 52,026评论 3 396
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,253评论 2 375

推荐阅读更多精彩内容