Flink 常用API详解

Flink常用API详解

概述:
    Flink根据抽象程度分层,提供了3种不同的API和库,每一种API在简洁性和表达力上有着不同的侧重,并且针对不同的应用场景.

ProcessFunction

概述:
    ProcessFunctino是Flink所提供最底层接口. ProcessFunction可以处理一或两条输入数据流中的单个事件或者归入一个特定窗口
    内的多个事件,它提供了对于时间和状态的细粒度控制,开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的某一时刻触发
    回调函数.因此你可以利用ProcessFunctino实现许多有状态的事件驱动应用所需要的基于单个事件的复杂业务逻辑.

DataStream API

概述:
    为许多通用的流处理操作提供了处理原语
    这些操作包括窗口、逐条记录的的转换操作,在处理时间时进行外部数据库查询等. DataStream API支持Java 和 Scala语言,预先定义了
    例如map() 、reduce() 、Aggregate()等函数,你可以通过扩展实现预定义接口或使用Java、SCala的lambda表达式实现自定义的函数

SQL& Table API:

概述:
    Flink支持两种关系型的API,Table API和SQL,这两个API都是批处理,和流处理统一的API,这意味着在无边界的实时数据流和
    有边界的历史记录数据流上,关系型API 会以相同的语义执行查询,并产生相同的结果.Table API 和SQL 接住了 Apache Calcite
    来进行查询的解析,校验以及优化,它们可以与DataStream和DataSEt API无缝集成,并支持用户自定义的标量函数,聚合函数以及标志函数.

复杂事件处理-CEP

概述:
    模式检测是事件流处理中的一个非常常见的用例. Flink 的CEP 库提供了API,使用户能够以例如正则表达式
    或状态机的方式指定事件模式,CEP库与Flink的DataStream API集成,以便在DataStream上评估模式.
    CEP库的应用包括网络入侵检测,业务流程监控和欺诈检测.

DataSet API

概述:
    DataSet API 是Flink用于批处理应用程序的核心 API, DataSet API所提供的的基础算子 包括 map、reduce、(outer) join、co-group、
    iterate等.
    所有算子都有相应的算法和数据结构支持,对内存中的序列化数据进行操作. 如果数据大小超过预留内存,则过量数据将存储到磁盘.

Gallery 可扩展的图形处理和分析库.

DataStream 的编程模型,
概述:
    DataStream的编程模型包括四个部分: Environment,DataSource,Trasnformation,Sink
    初始化上下文环境-> 数据源-> 转换操作,=> 数据输出

Flink的DataSource数据源;

1. 基于文件的Source
    读取HDFS文件系统的Source
    // 首先需要配置Hadoop的依赖
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.7.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.2</version>
    </dependency>
    读取HDFS上的文件:
        env.readTextFile($HDFS_PATH)
2. 基于集合的Source
    env.fromCollection()
3. 基于Kafka的Source
    首先需要配置Kafka连接器的依赖,另外更多的连接器可以查看官网
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>1.9.1</version>
    </dependency>

3. Flink的Sink数据目标

概述:
    Flink针对DataStream提供了大量的已经实现的数据目标(Sink),包括文件,Kafka,Redis,HDFS,Elasticsearch
1. 基于HDFS的Sink
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-filesystem_2.11</artifactId>
        <version>1.9.1</version>
    </dependency>
    Streaming File Sink能把数据写入HDFS中,还可以支持分桶写入,每一个分桶就对应HDFS中的一个目录,
    默认按照该小时来分桶,在一个桶内部,会进一步将输出基于滚动策略切分成更小的文件.这有助于防止桶文件变得过大,
    滚动策略也是可以配置的,默认策略会更具文件大小和超时时间来滚动文件,超时时间是指没有新数据写入部分文件(part file)的时间
2. 基于Redis的Sink
3. 基于Kafka的Sink
4. 自定义的Sink

4. DataStream 转换算子

概述:
    即通过从一个或多个DataStream生成新的DataStream的过程被称为Transformation操作. 在转换过程中,每种操作类型被
    定义为不同的Operator,Flink程序都能够将多个Transformation组成一个DataFlow的拓扑.
1. Map[DataStream->DataStream]
    调用用户自定义的MapFunction对DataStream[T]数据进行处理,形成新的DataStream[T],其中数据格式可能会发生变化,
    常用作对数据集内数据的清洗和转换,例如将输入数据集中的每个数值全部加1处理,并且将数据输出到下游数据集
2. FlatMap[DataStream->DataStream]
    该算子主要应用处理输入一个元素产生一个或者多个元素的计算场景,比较常见的是在经典例子WordCount中,将每一行的
    文本数据切割,生成单词序列
3. Filter[DataStream->DataStream]
    该算子将按照条件对输入数据集进行筛选操作,将符合条件的数据集输出,将不符合条件的数据过滤掉.
    3.1 // 通过通配符
        val filter:DataStream[Int] = dataStream.filter{_%2==0}
    3.2 // 或者指定运算表达式
        val filter:DataStream[Int] = dataStream.filter{x=> x%2==0}
4. KeyBy[DataStream->KeyedStream]
    该算子根据指定的key将输入的DataStream[T]数据格式转换为KeyedStream[T],也就是在数据集中执行Partition操作,将
    相同的Key值的数据放置在相同的分区中.
    例如WordCount-> 将数据集中第一个参数作为Key,对数据集进行KeyBy函数操作,形成根据Id分区的KeyedStream数据集.
    eg:
        val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3))
        //指定第一个字段为分区key
        val keyedStream: KeyedStream [(String,Int),Tuple] = dataStream.keyBy(0)
5. Reduce[KeyedStream->DataStream]
    该算子和MapReduce中Reduce原理基本一致,主要目的是将输入的KeyedStream通过传入的用户自定义地ReduceFunction
    滚动地进行数据聚合处理,其中定义ReduceFunction必须满足运算结合律和交换律,
    eg:
        对传入的KeyedStream数据集中相同key值的数据独立进行求和运算,得到每个key所对应的求和值.
        val dataStream = env.fromElements(("a",3),("d",4),("c",2),("a",5))
        //指定第一个字段为分区key`
        val keyedStream:KeyedStream[(String,Int),Tuple] = dataStream.keyBy(0)
        // 滚动对第二个字段进行reduce相加求和
        val reduceStream = keyedStream.reduce{(x1,x2)=>(x1._1,x1._2+x2._2)
6. Aggregations[KeyedStream->DataStream]
    Aggregations是KeyedDataStream接口提供的聚合算子,根据指定的字段进行聚合操作,滚动地产生一系列数据聚合结果.
    其实是将Reduce算子中的函数进行了封装,封装的聚合操作有sum、min、minBy、max、maxBy等,这样就不需要用户自己定义
    Reduce函数.
    eg:
        指定数据集中第一个字段作为key,用第二个字段作为累加字段,然后滚动地对第二个字段的数值进行累加并输出
        //指定第一个字段为分区key
        val keyedStream:KeyedStream[(Int,Int),Tuple] = dataStream.keyBy(0)
        // 对对第二个字段进行sum统计
        val sumStream:DataStream[(Int,Int)] = keyedStream.sum(1)
        // 输出计算结果
        sumStream.print()
7. Union[DataStream->DataStream]
    union算子主要是将两个或者多个输入的数据集合并成一个数据集,需要保证两个数据集的格式一致,输出的数据集的格式和
    输入的数据集格式保持一致,
    code:
        //获取flink实时流处理的环境
        val env = ExecutionEnvironment.getExecutionEnvironment
        // 创建不同的数据集
        val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
        val dataStream2 = env.fromElements(("a", 1), ("d", 1), ("c", 1), ("a", 1))
        // 调用union算子进行不同的数据集合并
        dataStream.union(dataStream2).print()
8. Connect,CoMap,CoFlatMap[DataStream->ConnectedStream->DataStream](只能在Stream才可以用)
    connect算子主要是为了合并两种或者多种不同数据类型的数据集,合并后会保留原来原来数据集的数据类型.
    例如:  dataStream1数据集为(String,Int) 元组类型,dataStream2数据集为Int类型,通过connect连接算子将两个不同数据
    类型的流结合在一起,形成格式为ConnectedStreams的数据集,其内部数据为[(String,Int),Int]的混合数据类型,保留了两个
    原始数据集的数据类型
    eg:
        //获取flink实时流处理的环境
            val env = StreamExecutionEnvironment.getExecutionEnvironment
            // 创建不同的数据集
            val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
            val dataStream2 = env.fromElements(1, 2, 4, 5)
            // 连接两个DataStream数据集
            val connectedStream = dataStream.connect(dataStream2)
            val result = connectedStream.map(
              //第一个处理函数
              t1 => {
                (t1._1, t1._2)
              },
              //第二个处理函数
              t2 => {
                (t2, 0)
              })
            result.print()
            env.execute("h")
    注意:Union和Connect区别
        1. Union之间两个流的类型必须是一样,Connect可以不一样,在之后的coMap中在去调整成为一样的.
        2. Connect只能操作两个流,Union可以操作多个
9. Split 和 select [DataStream -> SplitStream->DataStream]
    Split算子是将一个DataStream数据集按照条件进行拆分,形成两个数据集的过程,也是Union算子的逆向实现,每个接入的数据
    都会被路由到一个或者多个输出数据集中,
    在使用Splict函数中,需要定义split函数中的切分逻辑,通过调用split函数,然后指定条件判断函数,
    例如: 如下代码所示,将根据第二个字段的奇偶性将数据集标记出来,如果是偶数则标记为even,如果是奇数则标记为odd,然后通过集合将标记返回,最终生成格式SplitStream的数据集
    code:
        
        //获取flink实时流处理的环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 导入Flink隐式转换
        import org.apache.flink.streaming.api.scala._
        // 创建不同的数据集
        val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
        val splitedStream = dataStream.split(t => if (t._2 % 2 == 0) Seq("even") else Seq("odd"))
    
        // Split函数本身只是对输入数据集进行标记,并没有将数据集真正的实现拆分,因此需要借助Select函数根据标记将数据切分成不同的数据集,
        //筛选出偶数数据集
        val evenStream = splitedStream.select("even").print()
        //筛选出偶数数据集
        val oddStream = splitedStream.select("odd")
    
        env.execute("l ")

函数类和富函数类

概述:
    前面学过的所有算子集合都可以自定义一个函数类,富函数类作为参数,因为Flink暴露了这两种函数类的接口,常见的函数接口:
    1. MapFunction
    2. FlatMapFunction
    3. ReduceFunction
    富函数接口它与其他常规函数接口的不同在于:可以获取运行环境的上下文,在上下文环境中可以管理状态(State),并拥有一些生命周期方法,
    所以可以实现更复杂的功能.富函数的接口有:
    1. RichMapFunction
    2. RichFlatMapFunction
    3. RichFilterFunction
1. 普通函数类举例:
    按照指定的时间格式输出每个通话的拨号时间和结束时间
    code:
        import java.text.SimpleDateFormat
        import java.util.Date
        import org.apache.flink.api.common.functions.MapFunction
        import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
        object FunctionClassTransformation {
          case class StationLog(sid: String, var callOut: String, var callInt: String, callType: String, callTime: Long, duration: Long)
          def main(args: Array[String]): Unit = {
            //获取flink实时流处理的环境
            val env = StreamExecutionEnvironment.getExecutionEnvironment
            env.setParallelism(1)
            // 导入Flink隐式转换
            import org.apache.flink.streaming.api.scala._
            val data = env.readTextFile(getClass.getResource("station.log").getPath)
              .map { line =>
                var arr = line.split(",")
                StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
              }
            // 定义时间输出格式
            val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
            // 过滤那些通话成功的
            data.filter(_.callType.equals("success"))
              .map(new CallMapFunction(format))
              .print()
        
            env.execute("l ")
        
          }
        
          class CallMapFunction(format: SimpleDateFormat) extends
            MapFunction[StationLog, String] {
            override def map(t: StationLog): String = {
              var startTime = t.callTime
              val endTime = t.callTime + t.duration * 1000
              "主叫号码: " + t.callOut + " , 被叫号码: " + t.callInt + ", 呼叫起始时间: " + format.format(new Date(startTime)) + ",呼叫结束时间: " + format.format(new Date(endTime))
            }
          }}
    result:
        主叫号码: 18600003186 , 被叫号码: 18900002113, 呼叫起始时间: 2019-12-23 13:54:13,呼叫结束时间: 2019-12-23 13:54:45
        主叫号码: 18600003794 , 被叫号码: 18900009608, 呼叫起始时间: 2019-12-23 13:54:13,呼叫结束时间: 2019-12-23 13:54:17
        主叫号码: 18600000005 , 被叫号码: 18900007729, 呼叫起始时间: 2019-12-23 13:56:43,呼叫结束时间: 2019-12-23 14:02:32
        主叫号码: 18600005404 , 被叫号码: 18900000558, 呼叫起始时间: 2019-12-23 13:54:17,呼叫结束时间: 2019-12-23 13:54:22
        主叫号码: 18600003532 , 被叫号码: 18900008128, 呼叫起始时间: 2019-12-23 13:54:19,呼叫结束时间: 2019-12-23 13:54:29
        主叫号码: 18600003532 , 被叫号码: 18900008128, 呼叫起始时间: 2019-12-23 13:54:26,呼叫结束时间: 2019-12-23 13:54:41
        主叫号码: 18600003502 , 被叫号码: 18900009859, 呼叫起始时间: 2019-12-23 13:54:28,呼叫结束时间: 2019-12-23 13:54:28
        主叫号码: 18600003502 , 被叫号码: 18900009859, 呼叫起始时间: 2019-12-23 13:54:28,呼叫结束时间: 2019-12-23 13:54:28            
2.  富函数类举例:
        把呼叫成功的通话信息转换成真实的用户姓名,通话用户对应的用户表(在MySql数据库中),
        由于需要从数据库中查询数据,就需要创建连接,创建连接的代码必须写在生命周期的open方法中,所以需要使用富函数类.
    Rich Function有一个生命周期的概念,典型的生命周期方法有:
        open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用.
        clsose()方法是生命周期中的最后一个调用的方法,做一些清理工作
        getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
    code:
        package com.bjsxt.flink.transformation
        
        import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
        
        import com.bjsxt.flink.source.StationLog
        import org.apache.flink.api.common.functions.RichMapFunction
        import org.apache.flink.configuration.Configuration
        import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
        
        object TestRichFunctionClass {
        
          /**
           * 把通话成功的电话号码转换成真是用户姓名,用户姓名保存在Mysql表中
           * @param args
           */
          def main(args: Array[String]): Unit = {
            val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
            import org.apache.flink.streaming.api.scala._
        
            //读取数据源
            var filePath =getClass.getResource("/station.log").getPath
            val stream: DataStream[StationLog] = streamEnv.readTextFile(filePath)
              .map(line=>{
                var arr=line.split(",")
                new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
              })
        
            //计算:把电话号码变成用户姓名
            val result: DataStream[StationLog] = stream.filter(_.callType.equals("success"))
              .map(new MyRichMapFunction)
            result.print()
        
            streamEnv.execute()
          }
        
          //自定义一个富函数类
          class MyRichMapFunction extends RichMapFunction[StationLog,StationLog]{
            var conn:Connection=_
            var pst:PreparedStatement=_
            override def open(parameters: Configuration): Unit = {
              conn =DriverManager.getConnection("jdbc:mysql://localhost/test","root","123123")
              pst =conn.prepareStatement("select name from t_phone where phone_number=?")
            }
        
            override def close(): Unit = {
              pst.close()
              conn.close()
            }
        
            override def map(value: StationLog): StationLog = {
              println(getRuntimeContext.getTaskNameWithSubtasks)
              //查询主叫号码对应的姓名
              pst.setString(1,value.callOut)
              val result: ResultSet = pst.executeQuery()
              if(result.next()){
                value.callOut=result.getString(1)
              }
              //查询被叫号码对应的姓名
              pst.setString(1,value.callInt)
              val result2: ResultSet = pst.executeQuery()
              if(result2.next()){
                value.callInt=result2.getString(1)
              }
              value
            }
          }
        }
6. 底层 ProcessFunctionAPI
    概述:
        ProcessFunction是一个低层次的流处理操作,允许所有返回Stream的基础构建模块:
        访问Event本身数据(比如:Event的时间, Event的当前Key)
        管理状态 State(仅在keyed Stream中)
        管理定时器Timer( 包括: 注册定时器,删除定时器等)
    总而言之,ProcessFunction是Flink最底层的API,也是功能最强大的.
    例如:
        监控每一个手机,如果在5s内呼叫它的通话都是失败的,发出警告信息.
    注意:
        这个案例中会使用到状态编程,请同学们只要知道状态的意思,不需要掌握,
    code:
        package com.bjsxt.flink.transformation
        
        import com.bjsxt.flink.source.StationLog
        import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
        import org.apache.flink.streaming.api.functions.KeyedProcessFunction
        import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
        import org.apache.flink.util.Collector
        
        object TestProcessFunction {
        
          //监控每一个手机号码,如果这个号码在5秒内,所有呼叫它的日志都是失败的,则发出告警信息
          //如果在5秒内只要有一个呼叫不是fail则不用告警
          def main(args: Array[String]): Unit = {
        
            val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
            import org.apache.flink.streaming.api.scala._
        
            //读取数据源
            val stream: DataStream[StationLog] = streamEnv.socketTextStream("hadoop101",8888)
              .map(line=>{
                var arr=line.split(",")
                new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
              })
        
            //计算
            val result: DataStream[String] = stream.keyBy(_.callInt)
              .process(new MonitorCallFail)
            result.print()
        
            streamEnv.execute()
          }
        
          //自定义一个底层的类
          class MonitorCallFail extends KeyedProcessFunction[String,StationLog,String]{
            //使用一个状态对象记录时间
            lazy val timeState :ValueState[Long] =getRuntimeContext.getState(new ValueStateDescriptor[Long]("time",classOf[Long]))
        
            override def processElement(value: StationLog, ctx: KeyedProcessFunction[String, StationLog, String]#Context, out: Collector[String]): Unit = {
              //从状态中取得时间
              var time =timeState.value()
              if(time==0 && value.callType.equals("fail") ){ //表示第一次发现呼叫失败,记录当前的时间
                //获取当前系统时间,并注册定时器
                var nowTime =ctx.timerService().currentProcessingTime()
                //定时器在5秒后触发
                var onTime =nowTime+8*1000L
                ctx.timerService().registerProcessingTimeTimer(onTime)
                //把触发时间保存到状态中
                timeState.update(onTime)
              }
              if (time!=0 && !value.callType.equals("fail")){ //表示有一次成功的呼叫,必须要删除定时器
                ctx.timerService().deleteProcessingTimeTimer(time)
                timeState.clear() //清空状态中的时间
              }
            }
        
            //时间到了,定时器执行,
            override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, StationLog, String]#OnTimerContext, out: Collector[String]): Unit = {
              var warnStr="触发的时间:"+timestamp +" 手机号 :"+ctx.getCurrentKey
              out.collect(warnStr)
              timeState.clear()
            }
          }
        }

7. 侧输出流 Side Output

概述:
    在Flink处理数据流时,我们经常会遇到这样的情况: 在处理一个数据源时,往往需要将该源中的不同类型的数据做分割处理,如果使用filter
    算子对数据源进行筛选分割的话,势必会造成数据流的多次复制,造成不必要的性能浪费;flink中的侧输出就是将数据流进行分割,而不对流
    进行复制的一种分流机制,
    Flink的侧输出的另一个作用就是对延时迟到的数据进行处理,这样就可以不必丢弃迟到的数据.
案例: (根据基站的日志,请把呼叫成功的Stream(主流)和不成功的Stream(侧流分别输出)
code:
    package FlinkDemo.functions
    9
    import FlinkDemo.functions.FunctionClassTransformation.StationLog
    import org.apache.flink.streaming.api.functions.ProcessFunction
    import org.apache.flink.util.Collector
    
    object TestSideOutpurStream {
      // 导入Flink隐式转换
      import org.apache.flink.streaming.api.scala._
    
      // 侧输出流首先需要定义一个流的标签
      val notSuccessTag = new OutputTag[StationLog]("not_success")
    
      def main(args: Array[String]): Unit = {
        //获取flink实时流处理的环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
    
        val data = env.readTextFile(getClass.getResource("station.log").getPath)
          .map { line =>
            var arr = line.split(",")
            StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
          }
        val mainStream = data
          .process(new CreateSideOutputStream(notSuccessTag))
        // 得到测流
        val sideStream = mainStream.getSideOutput(notSuccessTag)
        mainStream.print("main")
        sideStream.print("sideOutput")
        env.execute()
      }
    
      class CreateSideOutputStream(tag: OutputTag[StationLog]) extends ProcessFunction[StationLog, StationLog] {
        override def processElement(value: StationLog, context: ProcessFunction[StationLog, StationLog]#Context, collector: Collector[StationLog]): Unit = {
          //输出主流
          if (value.callType.equals("success")) {
            collector.collect(value)
          }
          else {
            //输出侧流
            context.output(tag, value)}}}}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容

  • Flink程序是实现分布式集合转换的常规程序(例如, filtering, mapping, updating s...
    大菜鸟_阅读 2,462评论 0 1
  • 摘于博古林全集 夜晚,如今随着我们生命进程不断地推进,那种自我感触变的更加厚实沉淀起来,越来越多更为实在的日常...
    博古林V雙木居士阅读 931评论 1 4
  • 易经是十三经之首。 荣格的老师说,读懂了易经,你就知道如何做心理治疗。 荣格接触了易经之后,创造了深度分析心理学。...
    春江花月夜9999阅读 347评论 2 2
  • 面对监理的抱怨,辛苦的施工工人及其相关负责人都很无奈这种难以满足的验收标准,由不得怨恨监理的吹毛求疵。一位刚毕业的...
    匆匆不离去阅读 277评论 0 0
  • 感恩伙伴一早叫起,还带的早餐 感恩店铺伙伴认真负责,来春装了,整理库房 感恩今天的学习,很有优秀的分享,值得学习 ...
    十八菩提子阅读 113评论 0 0