Flink 集群构建 & 逻辑计划生成

Flink 集群构建 & 逻辑计划生成

转载:Flink 集群构建 & 逻辑计划生成

概要和背景

flink是一个被誉为 the 4th G 的计算框架,不同的框架特性及其代表项目列表如下:

第一代 第二代 第三代 第四代
Batch Batch Interactive Batch Interactive Near-Real-Time Interative-processing Hybrid Interactive****Real-Time-Streaming****Native-Iterative-processing
DAG Dataflows RDD Cyclic Dataflows
Hadoop MapReduce TEZ Spark Flink

本文主要介绍flink的核心组件以及逻辑计划的生成过程

参考代码分支 flink-1.1.2

核心组件介绍

这里只介绍 on yarn 模式下的组件

flink 的 on yarn 模式支持两种不同的类型:

  1. 单作业单集群
  2. 多作业单集群

首先介绍 单作业单集群 的架构,单作业单集群下一个正常的 flink 程序会拥有以下组件


job Cli: 非 detatched 模式下的客户端进程,用以获取 yarn Application Master 的运行状态并将日志输出到终端

JobManager[JM]: 负责作业的运行时计划 ExecutionGraph 的生成、物理计划生成和作业调度

TaskManager[TM]: 负责被分发 task 的执行、心跳/状态上报、资源管理


整体的架构大致如下图所示:

image-20181226153020674

下面以一次 Job 的提交过程描述 flink 各组件的作用及协同

作业提交流程分析

单作业单集群模式下,一个作业会启动一个 JM,并依据用户的参数传递启动相应数量的 TM,每个 TM 运行在 yarn 的一个 container 中,

一个通常的 flink on yarn 提交命令:
./bin/flink run -m yarn-cluster -yn 2 -j flink-demo-1.0.0-with-dependencies.jar —ytm 1024 -yst 4 -yjm 1024 —yarnname flink_demo_waimai_e
flink 在收到这样一条命令后首先会通过 Cli 获取 flink 的配置,并解析命令行参数。

配置加载

CliFrontend.java 是 flink 提交作业的入口

//CliFrontend line144
public CliFrontend() throws Exception {
   this(getConfigurationDirectoryFromEnv());
}

这里会尝试加载 conf 文件夹下的所有 yaml 文件,配置文件的命名并没有强制限制

参数解析

解析命令行参数的第一步是路由用户的命令,然后交由run方法去处理

//CliFrontend line993
try {
    return SecurityUtils.runSecured(new SecurityUtils.FlinkSecuredRunner<Integer>() {
        Override
        public Integer run() throws Exception {
            return CliFrontend.this.run(params);
        });
    }
    catch (Exception e) {
        return handleError(e);
    }

接下来是程序参数设置过程,flink 将 jar包路径和参数配置封装成了 PackagedProgram

//CliFrontend line223
PackagedProgram program;
try {
   LOG.info("Building program from JAR file");
   program = buildProgram(options);
}
catch (Throwable t) {
   return handleError(t);
}

flink集群的构建

集群类型的解析

获取参数后下一步就是集群的构建和部署,flink 通过 两个不同的 CustomCommandLine 来实现不同集群模式的解析,分别是 FlinkYarnSessionCliDefaultCLI 【吐槽一下 flink 类名的命名规范】解析命令行参数

//CliFrontend line125
static {
   /** command line interface of the YARN session, with a special initialization here
    *  to prefix all options with y/yarn. */
   loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
   customCommandLine.add(new DefaultCLI());
}
...
//line882 这里将决定Cli的类型
CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());

那么什么时候解析成 Yarn Cluster 什么时候解析成 Standalone 呢?由于FlinkYarnSessionCli被优先添加到customCommandLine,所以会先触发下面这段逻辑

//FlinkYarnSessionCli line469
@Override
public boolean isActive(CommandLine commandLine, Configuration configuration) {
   String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
   boolean yarnJobManager = ID.equals(jobManagerOption);
   boolean yarnAppId = commandLine.hasOption(APPLICATION_ID.getOpt());
   return yarnJobManager || yarnAppId || loadYarnPropertiesFile(commandLine, configuration) != null;
}

从上面可以看出如果用户传入了 -m参数或者application id或者配置了yarn properties 文件,则启动yarn cluster模式,否则是Standalone模式的集群

集群部署

flink通过YarnClusterDescriptor来描述yarn集群的部署配置,具体对应的配置文件为flink-conf.yaml,通过下面这段逻辑触发集群部署:

//AbstractYarnClusterDescriptor line372
/**
 * This method will block until the ApplicationMaster/JobManager have been
 * deployed on YARN.
 */
protected YarnClusterClient deployInternal() throws Exception {

大致列下过程:

  • check yarn 集群队列资源是否满足请求
  • 设置 AM Context、启动命令、submission context
  • 如果开启高可用模式【通过反射调用 submission context 的两个方法修改属性】 keepContainersMethod attemptFailuresValidityIntervalMethod 【和 Hadoop 的版本有关】第一个属性表示应用重试时是否保留 AM container,第二个属性表示 指定 间隔时间之内应用允许失败重启的次数
  • 上传 用户 jar、flink-conf.yaml、lib 目录下所有的 jar 包、logback log4j配置文件 到 HDFS
  • 通过 yarn client submit am context
  • 将yarn client 及相关配置封装成 YarnClusterClient 返回

真正在 AM 中运行的主类是 YarnApplicationMasterRunner,它的 run方法做了如下工作:

  • 启动JobManager ActorSystem
  • 启动 flink ui
  • 启动YarnFlinkResourceManager来负责与yarn的ResourceManager交互,管理yarn资源
  • 启动 actor System supervise 进程

到这里 JobManager 已经启动起来,那么 TaskManager是什么时候起动的呢?

YarnFlinkResourceManager启动的时候会预先执行一段逻辑【Akka actor的 preStart 方法】:

@Override
public void preStart() {
    try {
        // we start our leader retrieval service to make sure we get informed
        // about JobManager leader changes
        leaderRetriever.start(new LeaderRetrievalListener() {

            @Override
            public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
                self().tell(
                        new NewLeaderAvailable(leaderAddress, leaderSessionID),
                        ActorRef.noSender());
            }

这段逻辑会先尝试获取 JobManager 的地址并给自己发送一个路由消息NewLeaderAvailable,然后YarnFlinkResourceManager会把自己注册到 JobManager 中,接着JobManager会发送一个回调命令:

//JobManager line358
sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources))

接着会触发这样一段逻辑:

//FlinkResourceManager line555
private void checkWorkersPool() {
   int numWorkersPending = getNumWorkerRequestsPending();
   int numWorkersPendingRegistration = getNumWorkersPendingRegistration();

   // sanity checks
   Preconditions.checkState(numWorkersPending >= 0,
      "Number of pending workers should never be below 0.");
   Preconditions.checkState(numWorkersPendingRegistration >= 0,
      "Number of pending workers pending registration should never be below 0.");

   // see how many workers we want, and whether we have enough
   int allAvailableAndPending = startedWorkers.size() +
      numWorkersPending + numWorkersPendingRegistration;

   int missing = designatedPoolSize - allAvailableAndPending;

   if (missing > 0) {
      requestNewWorkers(missing);
   }
}

将所有的 TS 起动起来,这样一个 flink 集群便构建出来了。下面附图解释下这个流程:

image-20181226153152396
  1. flink cli 解析本地环境配置,启动 ApplicationMaster
  2. ApplicationMaster 中启动 JobManager
  3. ApplicationMaster 中启动YarnFlinkResourceManager
  4. YarnFlinkResourceManagerJobManager发送注册信息
  5. YarnFlinkResourceManager注册成功后,JobManagerYarnFlinkResourceManager发送注册成功信息
  6. YarnFlinkResourceManage知道自己注册成功后像ResourceManager申请和TaskManager数量对等的 container
  7. 在container中启动TaskManager
  8. TaskManager将自己注册到JobManager

接下来便是程序的提交和运行

程序在CliFrontend中被提交后,会触发这样一段逻辑

//ClusterClient 304
    public JobSubmissionResult run(PackagedProgram prog, int parallelism)
            throws ProgramInvocationException
    {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        ...
        else if (prog.isUsingInteractiveMode()) {
            LOG.info("Starting program in interactive mode");
            ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(),
                    prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
                    prog.getSavepointPath());
            ContextEnvironment.setAsContext(factory);

            try {
                // invoke main method
                prog.invokeInteractiveModeForExecution();
                ...
            }
            finally {
                ContextEnvironment.unsetContext();
            }
        }
        else {
            throw new RuntimeException("PackagedProgram does not have a valid invocation mode.");
        }
    }

注意到有一段prog.invokeInteractiveModeForExecution(),这是客户端生成初步逻辑计划的核心逻辑,下面将详细介绍

客户端逻辑计划

上面提到prog.invokeInteractiveModeForExecution()这段逻辑会触发客户端逻辑计划的生成,那么是怎样一个过程呢?其实这里只是调用了用户jar包的主函数,真正的触发生成过程由用户代码的执行来完成。例如用户写了这样一段 flink 代码:

object FlinkDemo extends App with Logging{
  override def main(args: Array[String]): Unit ={
    val properties = new Properties
    properties.setProperty("bootstrap.servers", DemoConfig.kafkaBrokerList)

 properties.setProperty("zookeeper.connect","host01:2181,host02:2181,host03:2181/kafka08")
    properties.setProperty("group.id", "flink-demo-waimai-e")

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE) //checkpoint every 5 seconds.
    val stream = env.addSource(new FlinkKafkaConsumer08[String]("log.waimai_e", new SimpleStringSchema, properties)).setParallelism(2)
    val counts = stream.name("log.waimai_e").map(toPoiIdTuple(_)).filter(_._2 != null)
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.addSink(sendToKafka(_))
    env.execute()
  }

注意到这样一段val env = StreamExecutionEnvironment.getExecutionEnvironment,这段代码会获取客户端的环境配置,它首先会转到这样一段逻辑:

//StreamExecutionEnvironment 1256
public static StreamExecutionEnvironment getExecutionEnvironment() {
   if (contextEnvironmentFactory != null) {
      return contextEnvironmentFactory.createExecutionEnvironment();
   }

   // because the streaming project depends on "flink-clients" (and not the other way around)
   // we currently need to intercept the data set environment and create a dependent stream env.
   // this should be fixed once we rework the project dependencies

   ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

ExecutionEnvironment.getExecutionEnvironment();获取环境的逻辑如下:

//ExecutionEnvironment line1137
public static ExecutionEnvironment getExecutionEnvironment() {
   return contextEnvironmentFactory == null ?
         createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment();
}

这里的contextEnvironmentFactory是一个静态成员,早在ContextEnvironment.setAsContext(factory)已经触发过初始化了,其中包含了如下的环境信息:

//ContextEnvironmentFactory line51
public ContextEnvironmentFactory(ClusterClient client, List<URL> jarFilesToAttach,
      List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
      boolean isDetached, String savepointPath)
{
   this.client = client;
   this.jarFilesToAttach = jarFilesToAttach;
   this.classpathsToAttach = classpathsToAttach;
   this.userCodeClassLoader = userCodeClassLoader;
   this.defaultParallelism = defaultParallelism;
   this.isDetached = isDetached;
   this.savepointPath = savepointPath;
}

其中的 client 就是上面生成的 YarnClusterClient,其它的意思较明显,就不多做解释了。

用户在执行val env = StreamExecutionEnvironment.getExecutionEnvironment这样一段逻辑后会得到一个StreamContextEnvironment,其中封装了 streaming 的一些执行配置 【buffer time out等】,另外保存了上面提到的 ContextEnvironment 的引用。

到这里关于 streaming 需要的执行环境信息已经设置完成。

初步逻辑计划 StreamGraph 的生成

接下来用户代码执行到val stream = env.addSource(new FlinkKafkaConsumer08,这段逻辑实际会生成一个DataStream抽象,DataStream是flink关于streaming抽象的最核心抽象,后续所有的算子转换都会在DataStream上来完成,上面的addSource操作会触发下面这段逻辑:

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {

   if(typeInfo == null) {
      if (function instanceof ResultTypeQueryable) {
         typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
      } else {
         try {
            typeInfo = TypeExtractor.createTypeInfo(
                  SourceFunction.class,
                  function.getClass(), 0, null, null);
         } catch (final InvalidTypesException e) {
            typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
         }
      }
   }

   boolean isParallel = function instanceof ParallelSourceFunction;

   clean(function);
   StreamSource<OUT, ?> sourceOperator;
   if (function instanceof StoppableFunction) {
      sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
   } else {
      sourceOperator = new StreamSource<>(function);
   }

   return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}

简要总结下上面的逻辑:

  • 获取数据源 source 的 output 信息 TypeInformation
  • 生成 StreamSource sourceOperator
  • 生成 DataStreamSource【封装了 sourceOperator】,并返回
  • 将 StreamTransformation 添加到算子列表 transformations 中【只有 转换 transform 操作才会添加算子,其它都只是暂时做了 transformation 的叠加封装】
  • 后续会在 DataStream 上做操作

该输出DataStreamSource继承自SingleOutputStreamOperator具体的继承关系如下:

image-20181226153250924

而生成的 StreamSource operator 走的是另一套继承接口:

stream-operator-extend

DataStreamSource 是一个 DataStream 数据流抽象,StreamSource 是一个 StreamOperator 算子抽象,在 flink 中一个 DataStream 封装了一次数据流转换,一个 StreamOperator 封装了一个函数接口,比如 map、reduce、keyBy等。关于算子的介绍会另起一节:flink算子的生命周期

可以看到在 DataStream 上可以进行一系列的操作(map filter 等),来看一个常规操作比如 map 会发生什么:

//DataStream line503
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {

   TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
         Utils.getCallLocationName(), true);

   return transform("Map", outType, new StreamMap<>(clean(mapper)));
}

一个map操作会触发一次 transform,那么transform做了什么工作呢?

//DataStream line1020
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

   // read the output type of the input Transform to coax out errors about MissingTypeInfo
   transformation.getOutputType();

   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
         this.transformation,
         operatorName,
         operator,
         outTypeInfo,
         environment.getParallelism());

   @SuppressWarnings({ "unchecked", "rawtypes" })
   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

   getExecutionEnvironment().addOperator(resultTransform);

   return returnStream;
}

transform() 生成了一个 StreamTransformation并以此作为成员变量封装成另一个 DataStream 返回,StreamTransformation是 flink关于数据流转换的核心抽象,只有需要 transform 的流才会生成新的DataStream 算子,后面会详细解释,注意上面有这一行getExecutionEnvironment().addOperator(resultTransform)flink会将transformation维护起来:

//StreamExecutionEnvironment line 1237
@Internal
public void addOperator(StreamTransformation<?> transformation) {
   Preconditions.checkNotNull(transformation, "transformation must not be null.");
   this.transformations.add(transformation);
}

所以,用户的一连串操作 map join 等实际上在 DataStream 上做了转换,并且flink将这些 StreamTransformation 维护起来,一直到最后,用户执行 env.execute()这样一段逻辑,StreamGraph 的构建才算真正开始…

用户在执行env.execute()会触发这样一段逻辑:

//StreamContextEnvironment line51   
public JobExecutionResult execute(String jobName) throws Exception {
      Preconditions.checkNotNull("Streaming Job name should not be null.");

      StreamGraph streamGraph = this.getStreamGraph();
      streamGraph.setJobName(jobName);

      transformations.clear();

      // execute the programs
      if (ctx instanceof DetachedEnvironment) {
         LOG.warn("Job was executed in detached mode, the results will be available on completion.");
         ((DetachedEnvironment) ctx).setDetachedPlan(streamGraph);
         return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE;
      } else {
         return ctx.getClient().runBlocking(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointPath());
      }
   }
}

这段代码做了两件事情:

  • 首先使用 StreamGraphGenerator 产生 StreamGraph
  • 使用 Client 运行 stream graph

那么StreamGraphGenerator 做了哪些操作呢?

StreamGraphGenerator会依据添加算子时保存的 transformations 信息生成 job graph 中的节点,并创建节点连接,分流操作 如 union,select,split 不会添加边,只会创建虚拟节点或在上有节点添加 selector

这里会将 StreamTransformation 转换为 StreamNode,StreamNode 保存了算子的信息【会另外介绍】,如下图所示

到这里由 StreamNode 构成的 DAG 图 StreamGraph就生成了

不过 在提交给 client 的时候,flink 会做进一步的优化:

StreamGraph 将进一步转换为 JobGraph,这一步工作由 StreamingJobGraphGenerator 来完成,为什么要做这一步转换呢?主要因为有可以 chain 的算子,这里进一步将 StreamNode 转换为 JobVertex,主要工作是将可以 chain 的算子合并【这一步优化是默认打开的】,并设置资源,重试策略等,最终生成可以提交给 JobManager 的 JobGraph

优化的逻辑计划 JobGraph 的生成

//StreamingJobGraphGenerator line181
private List<StreamEdge> createChain(
      Integer startNodeId,
      Integer currentNodeId,
      Map<Integer, byte[]> hashes) {

   if (!builtVertices.contains(startNodeId)) {

      List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

      List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
      List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

      for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
         if (isChainable(outEdge)) {
            chainableOutputs.add(outEdge);
         } else {
            nonChainableOutputs.add(outEdge);
         }
      }

      for (StreamEdge chainable : chainableOutputs) {
         transitiveOutEdges.addAll(createChain(startNodeId, chainable.getTargetId(), hashes));
      }
     ...

上面的方法是算子 chain 的核心操作,简要概括下:

  • 如果从此 start node 开始未生成过 JobVertex,则执行 chain逻辑,由于是递归操作,会先深度优先遍历,将源节点开始到第一个不可 chain 的 StreamNode 之间的算子做 chain 操作【先算叶子节点的 chain,依次往根节点计算】
  • line 207 遇到不可 chain 的边,开始深度遍历生成 JobVertex
  • line 216 将 StreamNode 的输入输出配置,包括序列化配置等设置到上面的 StreamingConfig 中,并在 vertexConfigs 中保存起来,如果是 新生成的 JobVertex,起对应的 StreamingConfig 会以 start node id 为 key 进行保存
  • transitiveOutEdges 保存的该节点下游所有的 non chain_able edges,最终的方法会返回此数据结构
  • 连接 start node 和所有的 transitiveOutEdges 【在输入 JobVertex 创建 IntermediateDataSet,partition类型为 pipeline,生成 JobEdge】
  • 如果是新生成JobVertex,继续设置config,包括 chain start,所有物理输出,及直接逻辑输出、chained config等
  • 如果不是新生成 JobVertex,直接chain configs

这里总结下JobGraph的构建过程,见下图:

flink-job-graph-create

大致过程总结如下:

  • DataStream上的操作生成StreamTransformation列表
  • StreamTransformation的生成关系创建StreamNodeStreamEdge
  • 做算子chain,合并成 JobVertex,并生成 JobEdge

一个 JobVertex 代表一个逻辑计划的节点,就是 DAG 图上的顶点,有点类似于 Storm 的 bolt 或 spout,生成一个 JobVertex 的逻辑如下:

//StreamingJobGenerator line258
private StreamConfig createJobVertex(
      Integer streamNodeId,
      Map<Integer, byte[]> hashes) {

   JobVertex jobVertex;
   ...
   JobVertexID jobVertexId = new JobVertexID(hash);

   if (streamNode.getInputFormat() != null) {
      jobVertex = new InputFormatVertex(
            chainedNames.get(streamNodeId),
            jobVertexId);
      TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
      taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(streamNode.getInputFormat()));
   } else {
      jobVertex = new JobVertex(
            chainedNames.get(streamNodeId),
            jobVertexId);
   }

   jobVertex.setInvokableClass(streamNode.getJobVertexClass());

   ...

   return new StreamConfig(jobVertex.getConfiguration());
}

这里有两段逻辑值得注意,第一是数据源节点的判断,第二是运行时执行类 InvokableClass 的设置

streamNode.getInputFormat()是判断是否是数据源节点的逻辑,如果是数据源节点,这里会将用户代码【这里为 InputFormat.class 的子类】设置进 JobVertex 的配置中,并在 JobManager 执行提交作业命令的时候做初始化,会在 Flink 物理计划生成一节介绍。

jobVertex.setInvokableClass是设置运行时的执行类,通过这个类再调用用户定义的 operator,是 flink task 中真正被执行的类,具体会在 flink-task-runtime 一节中详细介绍。

至此 JobGraph 生成,并扔给 JobManager 执行😝

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,442评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,604评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,576评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,652评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,495评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,370评论 1 274
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,792评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,435评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,735评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,777评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,553评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,399评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,806评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,038评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,330评论 1 253
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,766评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,964评论 2 337

推荐阅读更多精彩内容