HoodieWriteHandle 数据写入处理

项目中使用Flink SQL 将离线数据、流数据写入Hudi,自下而上分析写hudi的流程。hudi 版本0.10.0-patch。


HoodieWriteHandle

HoodieWriteHandle 负责对数据进行加工后写入底层DFS。

  • FlinkCreateHandle:创建新的 parquet 文件,并将一批数据写入,例如:copy表的batch insert。
  • FlinkAppendHandle:向已有的 hoodie log 文件追加数据,例如:mor表的upsert、delete数据。
  • FlinkMergeHandle: 将历史数据和新增数据合并后输出新的 parquet 文件,例如:cor表的update,mor表的compact。



    HoodieFileWriter:不同文件 format 的数据写入器,同时维护一个布隆过滤器。HoodieMergeHandle、HoodieCreateHandle 依赖该组件。

FlinkCreateHandle

创建 parquet 文件并将数据填充 hudi 元数据列信息后写入。主要流程:

  1. 向分区写入元数据文件 .hoodie_partition_metadata记录分区创建时间以及分区深度。
#partition metadata
#Thu Mar 10 03:07:01 UTC 2022
commitTime=20220310030659727
partitionDepth=1
  1. 向timeline server 发送创建marker , 表示在哪个 instant 针对哪个分区 file group 做了 create 操作, 在发生回滚时根据该 marker 删除目标 parquet。
  2. 根据要输出的文件格式创建 HoodieFileWriter,默认格式为Parquet,同时支持 orc 格式。HoodieParquetWriter 维护了一个 BloomFilter ,对flink 是没什么用的,spark 可以做 BloomIndex。
  3. 为 record 添加 hudi 元数据列后输出到最终文件。
  4. 填充 WriteStatus 作为最终结果返回,StreamWriteOperatorCoordinator 接收到所有 task的WriteStatus后写到./hoodie 下。
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
                          String partitionPath, String fileId, Option<Schema> overriddenSchema,
                          TaskContextSupplier taskContextSupplier, boolean preserveHoodieMetadata) {
  super(config, instantTime, partitionPath, fileId, hoodieTable, overriddenSchema,
      taskContextSupplier);
  this.preserveHoodieMetadata = preserveHoodieMetadata;
  writeStatus.setFileId(fileId);
  writeStatus.setPartitionPath(partitionPath);
  writeStatus.setStat(new HoodieWriteStat());
  // 构建 parquet path 例如: xxx/202203/710c791b-2358-4903-bef8-b885f48a40b8_1-2-0_20220318111533929.parquet
  this.path = makeNewPath(partitionPath);
  try {
    /**
     *向分区里面写入元数据信息,记录当前分区的创建时间和分区深度
     * #partition metadata
     * #Thu Mar 17 16:55:10 CST 2022
     * commitTime=20220317165442543
     * partitionDepth=1
     */
    HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
        new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
    partitionMetadata.trySave(getPartitionId());

    // 创建 mark file, 在哪个 instant 针对哪些分区 file group 做了什么操作
    createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
    //  创建文件写入器   FileWriter -->   HoodieFileWriter   --->  HoodieParquetWriter
    this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config,
      writeSchemaWithMetaFields, this.taskContextSupplier);
  } catch (IOException e) {
    throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
  }
  LOG.info("New CreateHandle for partition :" + partitionPath + " with fileId " + fileId);
}
// HoodieWriteHandle#write 入口
public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
  Option recordMetadata = record.getData().getMetadata();

  if (HoodieOperation.isDelete(record.getOperation())) {
    avroRecord = Option.empty();
  }

  try {
    if (avroRecord.isPresent()) {
      if (avroRecord.get().equals(IGNORE_RECORD)) {
        return;
      }
      //  为record 填充hoodie 的schema信息
      IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
      if (preserveHoodieMetadata) {
        fileWriter.writeAvro(record.getRecordKey(), recordWithMetadataInSchema);
      } else {
        // 为填充hoodie 的schema,填充数据
        fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
      }
      // update the new location of record, so we know where to find it next
      record.unseal();
      record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
      record.seal();
      recordsWritten++;
      insertRecordsWritten++;
    } else {
      recordsDeleted++;
    }

    writeStatus.markSuccess(record, recordMetadata);
    // deflate record payload after recording success. This will help users access payload as a
    // part of marking
    // record successful.
    record.deflate();
  } catch (Throwable t) {
    // Not throwing exception from here, since we don't want to fail the entire job
    // for a single record
    writeStatus.markFailure(record, t, recordMetadata);
    LOG.error("Error writing record " + record, t);
  }
}

FlinkAppendHandle

向 mor 表对应的logfile 追加数据完成数据的upsert/delete。通过 HoodieLogFormatWriter 写 HoodieLogBlock完成。
主要流程:

  1. 根据 partition 及 fileId 从HoodieTableFileSystemView 查找最新 fileSlice。
  2. 向timeline server 请求创建marker。
  3. 如果是新分区,创建分区写入.hoodie_partition_metadata记录分区元数据信息。
  4. 通过 HoodieLogFormatWriter 向fileSlice 的logfile 写 HoodieLogBlock,数据会先被缓存,在close()或者达到设置的 maxBlockSize 时会写出。
  5. 填充WriteStatus 作为结果返回。
public void write(HoodieRecord record, Option<IndexedRecord> insertValue) {
  Option<Map<String, String>> recordMetadata = record.getData().getMetadata();
  try {
    // 初始化fileSlice、HoodieLogFormatWriter、marker
    init(record);
    // 缓存数据大小达到 maxBlockSize (默认256M) 写出到
    flushToDiskIfRequired(record);
    // 为HoodieRecord 填充 hoodie meta 字段并转成avro数据格式存储到缓存
    writeToBuffer(record);
  } catch (Throwable t) {
    ...
  }
}

public List<WriteStatus> close() {
  try {
    // flush any remaining records to disk
    // 将数据从内存刷出到DFS
    appendDataAndDeleteBlocks(header);
    recordItr = null;
    if (writer != null) {
      writer.close();
      writer = null;
      // update final size, once for all log files
      ....
    }
    return statuses;
  } catch (IOException e) {
    throw new HoodieUpsertException("Failed to close UpdateHandle", e);
  }
}

将缓存数据封装为 HoodieDataBlock 由 HoodieLogFormatWriter 写入hudi log。

protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header) {
  try {
    //  Header metadata for a log block    header 元数据
    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString());

    // 最多包含 DataBlock 和 DeleteBlock
    List<HoodieLogBlock> blocks = new ArrayList<>(2);

    if (recordList.size() > 0) {
      if (config.populateMetaFields()) {
        // flink ==> 从元数据列抽取keyField, 并构建 HoodieAvroDataBlock。
        HoodieLogBlock dataBlock = HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header);
        blocks.add(dataBlock);
      } else {
        final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
        blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header, keyField));
      }
    }
    //  有需要删除的record, 创建 HoodieDeleteBlock, 写入 keysToDelete
    if (keysToDelete.size() > 0) {
      blocks.add(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header));
    }

    if (blocks.size() > 0) {
      //  由 HoodieLogFormatWriter 写入 HoodieLogBlock
      AppendResult appendResult = writer.appendBlocks(blocks);
      //  将写入结果 写到状态中
      processAppendResult(appendResult);

      recordList.clear();
      keysToDelete.clear();
    }
  } catch (Exception e) {
    throw new HoodieAppendException("Failed while appending records to " + writer.getLogFile().getPath(), e);
  }
}

HoodieLogBlock

hudi 写log 的格式为自己定义的 HoodieLogBlock,类型不同在读取时做不同处理。包含Header、Footer、Content、 Version等。

  • HoodieDataBlock:数据块,存储序列化后的 record,默认为HoodieAvroDataBlock。
  • HoodieDeleteBlock:要删除 HoodieKey 组成的数据块。
  • HoodieCorruptBlock:错误的数据快,在读的时候数据块未校验过,定义成HoodieCorruptBlock。
  • HoodieCommandBlock:命令行块,只有ROLLBACK_PREVIOUS_BLOCK类型,发生回滚时写入,在 scanner 读数据时读到该块会跳过目标块。
HoodieAvroDataBlock#serializeRecords
protected byte[] serializeRecords() throws IOException {
    Schema schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
    GenericDatumWriter<IndexedRecord> writer = new GenericDatumWriter<>(schema);
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream output = new DataOutputStream(baos);

    // 1. Write out the log block version      日志块版本
    output.writeInt(HoodieLogBlock.version);
    // 2. Write total number of records        总的记录数
    output.writeInt(records.size());
    // 3. Write the records                    avro record       
    Iterator<IndexedRecord> itr = records.iterator();
    while (itr.hasNext()) {
      IndexedRecord s = itr.next();
      ByteArrayOutputStream temp = new ByteArrayOutputStream();
      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, encoderCache.get());
      encoderCache.set(encoder);
      try {
        // Encode the record into bytes
        writer.write(s, encoder);
        encoder.flush();

        // Get the size of the bytes
        int size = temp.toByteArray().length;
        // Write the record size
        output.writeInt(size);
        // Write the content
        output.write(temp.toByteArray());
        itr.remove();
      } catch (IOException e) {
        throw new HoodieIOException("IOException converting HoodieAvroDataBlock to bytes", e);
      }
    }
    output.close();
    return baos.toByteArray();
  }

HoodieLogFormatWriter

将 HoodieLogBlock 以特定的格式写入 log file,如果底层文件系统不支持 Append 则创建新的文件写入。
针对logfile 构建 OutputStream,如果DFS不支持Append则会重新创建,注意观察生成的新文件名称。

private FSDataOutputStream getOutputStream() throws IOException, InterruptedException {
  if (this.output == null) {
    Path path = logFile.getPath();
    if (fs.exists(path)) {
      // 文件是否支持Append, 支持append的文件系统比较少, hdfs支持append
      boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
      if (isAppendSupported) {
        LOG.info(logFile + " exists. Appending to existing file");
        try {
          // open the path for append and record the offset
          this.output = fs.append(path, bufferSize);
        } catch (RemoteException e) {
          LOG.warn("Remote Exception, attempting to handle or recover lease", e);
          handleAppendExceptionOrRecoverLease(path, e);
        } catch (IOException ioe) {
          if (ioe.getMessage().toLowerCase().contains("not supported")) {
            // may still happen if scheme is viewfs.
            isAppendSupported = false;
          } else {
            close();
            throw ioe;
          }
        }
      }
      //  不支持 append 的文件系统会创建一个新的 log file
      if (!isAppendSupported) {
        //  滚动生成新的 log file,最大version +1
        rollOver();
        //  创建新的 logfile
        createNewFile();
        LOG.info("Append not supported.. Rolling over to " + logFile);
      }
    } else {
      LOG.info(logFile + " does not exist. Create a new file");
      // Block size does not matter as we will always manually autoflush
      createNewFile();
    }
  }
  return output;
}

block 写入log file 数据格式:

magic
block length
log version
block type
block header
block content length
block content
block footer
block length
public AppendResult appendBlocks(List<HoodieLogBlock> blocks) throws IOException, InterruptedException {
  // Find current version      log format 版本
  HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
      new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION);
  // 针对 logfile 构建 outputStream
  FSDataOutputStream outputStream = getOutputStream();
  long startPos = outputStream.getPos();
  long sizeWritten = 0;

  for (HoodieLogBlock block: blocks) {
    long startSize = outputStream.size();

    // 1. Write the magic header for the start of the block
    outputStream.write(HoodieLogFormat.MAGIC);

    // bytes for header   头信息  Instance, schema
    byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());

    // content bytes  将record 以avro 格式写入
    byte[] content = block.getContentBytes();

    // bytes for footer   空map
    byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());

    // 2. Write the total size of the block (excluding Magic)   log block的总字节大小
    outputStream.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length));

    // 3. Write the version of this log block
    outputStream.writeInt(currentLogFormatVersion.getVersion());
    // 4. Write the block type
    outputStream.writeInt(block.getBlockType().ordinal());

    // 5. Write the headers for the log block
    outputStream.write(headerBytes);
    // 6. Write the size of the content block
    outputStream.writeLong(content.length);
    // 7. Write the contents of the data block
    outputStream.write(content);
    // 8. Write the footers for the log block
    outputStream.write(footerBytes);
    // 9. Write the total size of the log block (including magic) which is everything written
    // until now (for reverse pointer)
    //  通过与标头中的块大小进行比较来确定块是否损坏
    // Update: this information is now used in determining if a block is corrupt by comparing to the
    //   block size in header. This change assumes that the block size will be the last data written
    //   to a block. Read will break if any data is written past this point for a block.
    outputStream.writeLong(outputStream.size() - startSize);

    // Fetch the size again, so it accounts also (9).
    sizeWritten +=  outputStream.size() - startSize;
  }
  // Flush all blocks to disk   数据刷出
  flush();
  //   数据写入的 <startPos 起始偏移量, sizeWritten>
  AppendResult result = new AppendResult(logFile, startPos, sizeWritten);

  // roll over if size is past the threshold
  rolloverIfNeeded();

  return result;
}

FlinkMergeHandle

用来将新数据和历史数据进行合并输出 baseFile,例如:cor表的 update,mor表的压缩。
主要流程:

  1. 基于新数据来构建 ExternalSpillableMap,该集合提供O(1)的查询能力,同时能够将数据溢出到磁盘,底层使用RocksDb 或者 BitCask 来实现,默认为BitCask。在压缩场景下,新数据可以是上游读取的log文件并下发。
  2. 接收历史数据和ExternalSpillableMap 中的新数据进行匹配,如果匹配成功根据数据合并策略对数据列进行合并,flink 默认会使用新数据的全部列,如果未匹配成功则将历史数据直接下发,写到新parquet里。
  3. close 时将未匹配历史数据的新数据下发。
public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
                         Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
                         TaskContextSupplier taskContextSupplier, HoodieBaseFile baseFile, Option<BaseKeyGenerator> keyGeneratorOpt) {
  super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
  // 将新记录存储到 ExternalSpillableMap
  init(fileId, recordItr);
  // 准备 fileWriter 及其他
  init(fileId, partitionPath, baseFile);
  
  validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
}


protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
  //  初始化 keyToNewRecords, 具有持久化能力的map
  initializeIncomingRecordsMap();
  
  while (newRecordsItr.hasNext()) {
    HoodieRecord<T> record = newRecordsItr.next();
    // update the new location of the record, so we know where to find it next
    if (needsUpdateLocation()) {
      record.unseal();
      record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
      record.seal();
    }

    // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist
    keyToNewRecords.put(record.getRecordKey(), record);
  }
}

private void init(String fileId, String partitionPath, HoodieBaseFile baseFileToMerge) {
    LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId);
    this.baseFileToMerge = baseFileToMerge;
    this.writtenRecordKeys = new HashSet<>();
    writeStatus.setStat(new HoodieWriteStat());
    try {
      //  最新 basefile (parquet文件)
      String latestValidFilePath = baseFileToMerge.getFileName();
      //  前一次commit 时间
      writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));

      HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
          new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
      partitionMetadata.trySave(getPartitionId());

      //  新的parquet文件名,       fileId_writeToken_instantTime_fileExtension
      String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension());

      // 构建 oldFilePath, newFilePath
      makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName);

      LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
          newFilePath.toString()));

      // file name is same for all records, in this bunch
      writeStatus.setFileId(fileId);
      writeStatus.setPartitionPath(partitionPath);
      writeStatus.getStat().setPartitionPath(partitionPath);
      writeStatus.getStat().setFileId(fileId);
      setWriteStatusPath();

      // Create Marker file
      createMarkerFile(partitionPath, newFileName);

      // Create the writer for writing the new version file
      fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config,
        writeSchemaWithMetaFields, taskContextSupplier);
    } catch (IOException io) {
      LOG.error("Error in update task at commit " + instantTime, io);
      writeStatus.setGlobalError(io);
      throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit "
          + instantTime + " on path " + hoodieTable.getMetaClient().getBasePath(), io);
    }
  }

数据处理流程:

public void write(GenericRecord oldRecord) {
  String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
  boolean copyOldRecord = true;

  if (keyToNewRecords.containsKey(key)) {
     // old record 和 新数据完全匹配
    // If we have duplicate records that we are updating, then the hoodie record will be deflated after
    // writing the first record. So make a copy of the record to be merged
    HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
    try {
      //  delete 则Option 不为empty.
      //  flink combinedAvroRecord 后的结果为新数据的 arvo,即 overwrite old record
      Option<IndexedRecord> combinedAvroRecord =
          hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
            useWriterSchema ? tableSchemaWithMetaFields : tableSchema,
              config.getPayloadConfig().getProps());

      if (combinedAvroRecord.isPresent() && combinedAvroRecord.get().equals(IGNORE_RECORD)) {
        // If it is an IGNORE_RECORD, just copy the old record, and do not update the new record.
        //  只拷贝旧记录
        copyOldRecord = true;
      } else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) { 
          // writeUpdateRecord 新数据直接写入, 就是写parquet文件
        /*
         * ONLY WHEN
         *  1) we have an update for this key AND
         *  2) We are able to successfully write the the combined new value
         *    We no longer need to copy the old record over.
         */
        copyOldRecord = false;
      }
      // 缓存已经写入的 record key
      writtenRecordKeys.add(key);
    } catch (Exception e) {
      throw new HoodieUpsertException("Failed to combine/merge new record with old value in storage, for new record {"
          + keyToNewRecords.get(key) + "}, old value {" + oldRecord + "}", e);
    }
  }

  if (copyOldRecord) {
    // this should work as it is, since this is an existing record
    try {
      // incoming 的数据不包含对历史数据的变更,直接下发  oldRecord
      fileWriter.writeAvro(key, oldRecord);
    } catch (IOException | RuntimeException e) {
      String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
              key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
      LOG.debug("Old record is " + oldRecord);
      throw new HoodieUpsertException(errMsg, e);
    }
    recordsWritten++;
  }
}

close 时下发未匹配到新数据,返回状态信息。

public List<WriteStatus> close() {
  try {
    //
    writeIncomingRecords();

    if (keyToNewRecords instanceof ExternalSpillableMap) {
      ((ExternalSpillableMap) keyToNewRecords).close();
    } else {
      keyToNewRecords.clear();
    }
    writtenRecordKeys.clear();

    if (fileWriter != null) {
      fileWriter.close();
      fileWriter = null;
    }

    long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
    HoodieWriteStat stat = writeStatus.getStat();

    stat.setTotalWriteBytes(fileSizeInBytes);
    stat.setFileSizeInBytes(fileSizeInBytes);
    stat.setNumWrites(recordsWritten);
    stat.setNumDeletes(recordsDeleted);
    stat.setNumUpdateWrites(updatedRecordsWritten);
    stat.setNumInserts(insertRecordsWritten);
    stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
    RuntimeStats runtimeStats = new RuntimeStats();
    runtimeStats.setTotalUpsertTime(timer.endTimer());
    stat.setRuntimeStats(runtimeStats);

    performMergeDataValidationCheck(writeStatus);

    LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(),
        stat.getFileId(), runtimeStats.getTotalUpsertTime()));

    return Collections.singletonList(writeStatus);
  } catch (IOException e) {
    throw new HoodieUpsertException("Failed to close UpdateHandle", e);
  }
}

protected void writeIncomingRecords() throws IOException {
    // write out any pending records (this can happen when inserts are turned into updates)
    Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)
        ? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator();
    while (newRecordsItr.hasNext()) {
      HoodieRecord<T> hoodieRecord = newRecordsItr.next();
      //  已经写入的数据不包含map中的数据,则下发map中的数据
      if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
        writeInsertRecord(hoodieRecord);
      }
    }
  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容