flink实验想法

Sink

writeAsText("file:///home/wangxiaotong/result");
结果:在对应工作节点的本地上,生成了result文件保存数据。

state

链接:https://zhuanlan.zhihu.com/p/29003852

  • Keyed State
    !!!keystream!!!中才能,而且是与key用关。可以当成是operator state按照key划分了,每个key对应一个state-partition。
  • Operator State
    !!!nokeystream!!!每个operator state对应一个并行度的实例。
  • Managed State:flink里封装好的数据结构,比如“ValueState”, “ListState”,等,他们会被编码到checkpoint里

=
flink中的state可以从2个纬度来划分:是否属于某个key(key state或者operator state),是否受flink管理(raw state或者managed state)。key state用于在KeyedStream中保存状态,operater state用于在普通的非key中保存状态。managed state是指被flink所管理的状态。raw state是被应用程序自己管理,flink会调用相应的接口方法来实现状态的restore和snapshot。

Using Managed Keyed State
  1. ValueState<T>:使用update(T) 来set,使用T value()来取回
  2. ListState<T>:add(T) or addAll(List<T>)来添加元素,使用 Iterable<T> get()来获取元素, 使用update(List<T>)来重写已经存在的list
  3. ReducingState<T>: 维持一个聚合结果的值,使用 add(T) 来加入元素,并且使用ReduceFunction函数来reduce出来聚合结果
  4. AggregatingState<IN, OUT>: 维持一个聚合结果的值,和3不同的是,这里聚合的类型可以是不同的元素类型,使用 add(IN)来加入元素,并且使用AggregateFunction函数来aggregated 出来聚合结果
  5. FoldingState<T, ACC>: flink 1.4之后将会被弃用,推荐使用AggregatingState 来代替
  6. MapState<UK, UV>: 使用map存储key-value对,通过 put(UK, UV) or putAll(Map<UK, UV>)来添加,使用 get(UK)来获取。
  • Raw State:算子自己的数据结构,他们会被编写成一系列的bytes写入checkpoint里

所有的datastream function可以使用managed state,但是raw state需要通过接口。更推荐managed state。

StateDescriptor:handle state
有a ValueStateDescriptor, a ListStateDescriptor, a ReducingStateDescriptor, a FoldingStateDescriptor or a MapStateDescriptor.

获取state:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> -
    getAggregatingState(AggregatingState<IN, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

state的存在时间:State Time-To-Live (TTL)

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

过期的state清除:只能通过显示的调用read清除,比如调用 ValueState.value()。这意味着默认情况下,如果未读取过期状态,则不会删除它,可能会导致状态不断增长。此外,您可以在获取 the full state snapshot 时激活 cleanup ,这将减小其大小。
在当前实现下不会清除本地状态,但在从上一个快照恢复的情况下,它不会包括已删除的过期状态。 它可以在StateTtlConfig中配置:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();

note:This option is not applicable for the incremental checkpointing in the RocksDB state backend.

Using Managed Operator State

只能使用两种方法:第一种,通过CheckpointedFunction接口,支持两种分配策略。第二种,通过ListCheckpointed <T extends Serializable>接口,且 list-style state 和even-split redistribution (事务切分的重分配策略),如果状态是re-partitionable,你可以使用Collections.singletonList(MY_STATE)。都当前只支持ListState 的状态。

分配策略

  • Even-split redistribution:
    在restore/redistribution阶段的时候,在多并行度的情况下,整个list state会切分成多个sublists。ListCheckpointed接口只支持这个。
  • Union redistribution:
    在restore/redistribution阶段的时候,每个operator会得到完整的list state。
    CheckpointedFunction接口可以选择两种,通过
checkpointedState = context.getOperatorStateStore().getUnionListState(descriptor);
checkpointedState = context.getOperatorStateStore().getListState(descriptor);

checkpointfunction:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;
  1. 初始化或者recovery的时候调用initializeState()
  2. 例子:
public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

故障恢复的时候,调用 isRestored()来查看当前的状态是不是恢复状态,如果是的话,restore的逻辑将会被应用。

单个状态的保存可以参考Stateful Source Functions(https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#stateful-source-functions)的offset的保存。

Stateful Source Functions

为了保证r exactly-once semantics on failure/recovery。用户需要使用一个lock。

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {

    /**  current offset for exactly once semantics */
    private Long offset;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset);
    }

    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}

The Broadcast State Pattern

  1. it has a map format,
  2. it is only available to specific operators that have as inputs a broadcasted stream and a non-broadcasted one, and
  3. such an operator can have multiple broadcast states with different names.
BroadcastProcessFunction and KeyedBroadcastProcessFunction
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

processElement用来处理non-broadcasted数据流,processBroadcastElement用来处理broadcasted 流。
并且processBroadcastElement用的时context,具有以下功能:

  • give access to the broadcast state: ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
  • allow to query the timestamp of the element: ctx.timestamp(),
  • get the current watermark: ctx.currentWatermark()
  • get the current processing time: ctx.currentProcessingTime(), and
  • emit elements to side-outputs: ctx.output(OutputTag<X> outputTag, X value).

Checkpointing

启动:StreamExecutionEnvironment的 enableCheckpointing(n) ,n是checkpoint的间隔
参数:

  • exactly-once vs. at-least-once:后者更适用于低延迟应用
  • checkpoint timeout: 超时的话,这个checkpoint进程将会被终止
  • minimum time between checkpoints: 表示新的checkpoint将会在上个完成的几秒内开始,比如设置5000,那么上个checkpoint完成的时候,下一个新checkpoint将会在5秒内开始。不受duration and the checkpoint interval的影响。
    注意:间隔需要设置比这个参数大。这个值也意味着并发检查点的数量是1。
  • number of concurrent checkpoints: 默认,只有一个chekpoint在处理。
    注意:这个参数和上面的最小时间不能同时设定并存。
  • externalized checkpoints: 定期把checkpoint持久化在外部。外部化的检查点将它们的元数据写到持久存储中,并且当job失败时不会自动清理。这样,如果你的工作失败了,你将会有一个检查点。设置可参考: deployment notes on externalized checkpoints.
  • fail/continue task on checkpoint errors: 默认是开启的。如果task在checkpoint的阶段失败的话,那么也会认为这个task失败了。可以设定不开启,那么会简单的拒绝checkpoint协调器,并且继续执行。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

Related Config Options

image.png

Selecting a State Backend

Flink会存储定时器的,stateful operator的state,还有connectors,window,用户自定义的state。默认,state存在taskmanager的内存里,checkpoint存在jobmanager的内存里。
通过参数设置:treamExecutionEnvironment.setStateBackend(…).

State Checkpoints in Iterative Jobs

迭代程序的checkpoint,需要启动特殊的标志force:
env.enableCheckpointing(interval, force = true).注意:故障的时候,loop的边内正处理的数据和修改的state将会丢失

Restart Strategies

可通过 flink-conf.yaml来配置重启策略,
如果没开启checkpoint机制,那么是no restart策略。
如果开启了checkpoint机制,但是没配置启动策略,那么默认启动 fixed-delay 策略,最多尝试Integer.MAX_VALUE次。


image.png

1:Fixed Delay ,故障重启后会尝试3次,每次间隔10s。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
));

也可在flink-conf.yaml里面配置相关设置:尝试次数,间隔延时等。


image.png

2:Failure Rate,和上面的不同的是,如果在设定的时间区间内,超出了故障的次数,那么就会最终失败。


image.png

也可在函数内配置:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // max failures per interval
  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
  Time.of(10, TimeUnit.SECONDS) // delay
));

3:No Restart

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());

4:Fallback Restart Strategy

State Backends

可使用的存储:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

默认MemoryStateBackend

MemoryStateBackend

能配置成异步快照,推荐使用异步快照,从而消除流水线的blocking,这是默认设置的。不想开启异步快照,这需要设置标志位,如:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
限制:

  • 默认每个state的sizie是5MB,这个可以MemoryStateBackend的数据结构里配置增加。
  • state不能比akka frame size大
  • The aggregate state must fit into the JobManager memory.

适用:

  • 本地部署和debugging
  • 小状态的任务,比如record-at-a-time functions (Map, FlatMap, Filter, …),kafka的consumer要求非常小的状态

FsStateBackend

保持in-flight数据存到TaskManager的内存里。
Checkpoint的时候:把状态快照,然后写到配置好的文件系统和目录里(“hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.)。最小的元数据存到JobManager的内存里。默认是开启异步的。不想开启异步快照,这需要设置标志位,如:
new FsStateBackend(path, false);
适用:

  • 大状态任务,长窗口,大键值状态
  • 所有高可用的设置

RocksDBStateBackend

保持in-flight数据到RocksDB数据库,这个数据库存在TaskManager的数据目录下。
Checkpoint的时候:整个RocksDB数据库会checkpointed到配置好的文件系统和目录下(“hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.)。最小的元数据存到JobManager的内存里。
限制:

  • RocksDB的接口是基于byte[],key的大小和值限制是2^31。
    适用:
  • 非常大状态任务,长窗口,大键值状态
  • 所有高可用的设置

Note:保持的state只是受限于磁盘的大小,比起保持state在内存中的FsStateBackend要大的多,但是他的最大吞吐量也会降低。
他也是唯一支持增量checkpoint的后端。

Configuring a State Backend

  1. 每个任务粒度的设置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
  1. 或者在flink-conf.yaml里面配置所有Setting Default State Backend,关键字state.backend., jobmanager (MemoryStateBackend), filesystem (FsStateBackend), rocksdb (RocksDBStateBackend),还有种org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory for RocksDBStateBackend.
    比如:
# The backend that will be used to store operator state checkpoints

state.backend: filesystem


# Directory for storing checkpoints

state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

需要在flink-config.yaml里面配置,在FWC.yaml配置不行

image.png

Custom Serialization for Managed State

某些特殊需求要 Managed State定制serialization逻辑。如果没需求,只用简单的序列化,这部分可以跳过。

Retained Checkpoints

默认情况下,checkpoint不会保留,仅用于从失败中恢复作业。 取消任务时会删除它们。 但是,您可以配置要保留的定期checkpoint。 根据配置,当作业失败或取消时,不会自动清除这些保留的checkpoint。 这样,如果您的工作失败,您将有一个checkpoint可以从中恢复。

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

- ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 当job取消的时候保留checkpoint,需要手动去删除。 ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 当job取消的时候删除checkpoint,checkpoint只在job故障的时候有用。

flink团队的实验

https://data-artisans.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink
状态实验:
https://github.com/StephanEwen/flink-demos/tree/master/streaming-state-machine

容错使用

checkpointing enabled, and for end-to-end exactly once guarantees, you need to havesources that support replayand sinks that are either idempotent or transactional.

in your case the first place to start might be to configure a restart strategy

https://stackoverflow.com/questions/43637190/does-flink-garauntee-tasks-fault-tolerance-in-all-cases

image.png

https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink

stateful wordcount
http://sinanbir.com/apache-flink-stateful-streaming-example/

ValueStateDescriptor

两种初始化:

public void processElement(Tuple2<String, Long> input,
            ProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>>.Context ctx,
            Collector<Tuple2<String, Long>> out) throws Exception {
        // TODO Auto-generated method stub
        Long currentSum = sum.value();
         if (currentSum == null) {
             currentSum = 0L;
            }
        currentSum += input.f1;
        sum.update(currentSum);
        out.collect(new Tuple2<>(input.f0, currentSum));
    }
   @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Long> descriptor =
                new ValueStateDescriptor<>(
                        "wordsum", // the state name
                        Long.class); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
   @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "wordsum", // the state name
                         Long.class, // type information
                        0L); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }

wordcount的窗口多种实现

第一种:使用apply

text.flatMap(new Tokenizer())
                // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(0)
                        .countWindow(11)
                                                .apply (new WindowFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, Tuple, GlobalWindow>() {
        public void apply (Tuple tuple,
                GlobalWindow window,
                Iterable<Tuple2<String, Integer>> values,
                Collector<Tuple2<String, Integer>> out) throws Exception {
            int sum = 0;
           
            for (Tuple2<String, Integer> t: values) {
                sum += t.f1;
                System.out.println("word"+t.f0);
            }
            out.collect (new Tuple2<String, Integer>( String.valueOf(window) ,sum));
        }
    });

第二种:使用reduce

text.flatMap(new Tokenizer())
                // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(0)
                        .countWindow(11)
                        .reduce(new ReduceFunction<Tuple2<String,Integer>>() {
                            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                                return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
                            }
                        });

结论:apply和process是通用的窗口函数,reduce等是对sum等特殊情况的简单应用。而且apply这是ProcessWindowFunction的旧版本,它提供较少的context信息,并且没有一些高级功能,例如per-window keyed state。 此接口将在某个时候弃用。推荐使用process。

窗口和key的问题:
Applies the given window function to each window. The window function is called for each evaluation of the window for each key individually.
而且经过实践和提问,发现每个窗口函数处理的是按per-key粒度的。

Generating Timestamps / Watermarks

只在event time的环境下有效。
timestamp和watermarks的生成有两种方式:

  1. 直接在source中生成,调用collectWithTimestamp和emitWatermark,注意这个方式会被第二种方式覆盖。
  2. Timestamp Assigners / Watermark Generators接口,通常在source之后调用,但是如果是map或者filter操作也可以不需要,但是如果是设计event time的操作,比如窗口,那么在这种算子前就得设定好timestamp assigner 。如果使用kafka source的话,flink在内部设定了接口。

设定好了2中timestamp assigner,区别是watermark的生成方式不同:

  1. With Periodic Watermarks
    AssignerWithPeriodicWatermarks 采集时间戳,并且定期的发送watermark,通过ExecutionConfig.setAutoWatermarkInterval(...).设定发送间隔,单位毫秒。只会发送比之前大的watermark。提供的BoundedOutOfOrdernessTimestampExtractor 和BoundedOutOfOrdernessGenerator 相似,功能是提取时间戳,并且按延时容忍度设置了产生watermark的规则。
  2. With Punctuated Watermarks
    watermarkAssignerWithPunctuatedWatermarks采集时间戳,并且按某些特殊的标记触发计算watermark。checkAndGetNextWatermark用来复制watermark。

注意:可以每个记录就产生一个watermark,但是每个watermark可能会导致下游的一些计算,所以过多的watermark会导致性能的下降。

Pre-defined Timestamp Extractors / Watermark Emitters

  1. Assigners with ascending timestamps
    如果kafka partition中的时间戳在每个partition中是严格递增的,那么可以直接可以使用定义好的ascending timestamps。


    image.png

这个的watermark是 current timestamp,因为是严格增的时间戳,所以不会有更早的时间戳到达。

  1. Assigners allowing a fixed amount of lateness
    设定一个延时容忍度,如果被认定为late tuple,那么默认是被忽略,丢弃不加入计算。
image.png

Working with window results

窗口计算后的timestamp应该怎么算呢?
这里建议:

This is set to the maximum allowed timestamp of the processed window, which is end timestamp - 1, since the window-end timestamp is exclusive. Note that this is true for both event-time windows and processing-time windows.

设定为最大末尾时间戳-1,这个设定对于event-time windows 和 processing-time windows都正确。对于process窗口来说,这个并没有什么影响,但是对于event窗口来说,这个对连续的窗口计算有用:比如下面的topK

image.png

REST API

获取latency:

  1. 获取能获取的metrics列表 http://10.11.6.70:8081/jobs/metrics
  2. 使用GET指令获取具体的值
    http://10.11.6.70:8081/jobs/metrics?get=latency.source_id.cbc357ccb763df2852fee8c4fc7d55f2.operator_id.7df19f87deec5680128845fd9a6ca18d.operator_subtask_index.0.latency_median

并行度设置

每个operator最好<=核数*机器数目

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 210,978评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 89,954评论 2 384
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,623评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,324评论 1 282
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,390评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,741评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,892评论 3 405
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,655评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,104评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,451评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,569评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,254评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,834评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,725评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,950评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,260评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,446评论 2 348

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,628评论 18 139
  • feisky云计算、虚拟化与Linux技术笔记posts - 1014, comments - 298, trac...
    不排版阅读 3,827评论 0 5
  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 31,905评论 2 89
  • 痛苦,是人生的一部分。只有经受住考验的人,才能享受到由痛苦转换而成的财富。 最近开始看路遥的《平凡的世界》,它展现...
    暮羽初心阅读 1,371评论 5 7
  • 母亲啊! 你是这世上最伟大的人 不怕黑 不怕累 不抱怨 而又有谁会知道 你也曾是个会害怕的女孩子啊! 会委屈 会孤...
    不尽长安阅读 358评论 4 7