druid查询流程

GroupBy查询

group by是druid的核心查询。

从broker到historical的查询

groupby.png

历史节点groupby查询

1查询入口
QueryResource注入到Guice,启动Jetty server(Jetty+Jersey+guice组合):CliHistorical是历史节点启动入口,这里将QueryResource注入到guice中,JettyServerModule从Guice中取出QueryResource作为servlet加入到Jetty,QueryResource中使用Jersey配置rest接口,接收broker发送过来的请求处理并响应。
QueryResource将请求参数构造出Query对象,ServerManager构造QueryRunner,执行各类查询,返回Sequence,序列化返回给调用方。
如何序列化Sequence
Sequence类似java中的list列表,通过迭代器返回Sequence中元素。强大之处在于,前一个元素已经返回,后一个元素还没有计算出来,流式返回数据。之所以能够这样是因为最终的返回值是一个按照一个或者多个维度值排序的列表,列表的元素是从多个有序的同样是已经排好序的sequence中查到,即执行K路归并排序。当然合并的过程中有可能有维度值相同的元素,则执行聚合操作。

try {
  // jgen 的输入流是StreamingOutput,这个流就是http输出流
  jgen.writeStartArray();
  while (!yielder.isDone()) {
    final Object o = yielder.get();
    // 这里可能当前数据已经返回了 而下一个元素还没查出来
    jgen.writeObject(o);
    yielder = yielder.next(null);
  }
  jgen.writeEndArray();
}
finally {
  yielder.close();
}

整个druid的数据返回模式都是这类迭代器模式,查询结果可能很大,流式返回数据减少内存的使用。
2 并发执行查询请求
SegmentManager中管理着当前节点负责加载查询的segment的信息。ServerManager根据查询的参数中的interval(时间范围)找出接下来需要查询的segment分片PartitionChunk,这里也是druid查询快的原因,能够通过时间范围锁定分片数据。通过装饰器构造出一个具体执行查询QueryRunner

FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
    .create(intervals)
    .transformCat(
        new Function<Interval, Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>>>()
        {
          @Override
          public Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>> apply(Interval input)
          {
            // 查找符合条件的TimelineObjectHolder,
            return timeline.lookup(input);
          }
        }
    )
    .transformCat(
        new Function<TimelineObjectHolder<String, ReferenceCountingSegment>, Iterable<QueryRunner<T>>>()
        {
          @Override
          public Iterable<QueryRunner<T>> apply(
              @Nullable
              final TimelineObjectHolder<String, ReferenceCountingSegment> holder
          )
          {
            if (holder == null) {
              return null;
            }
 
            return FunctionalIterable
                .create(holder.getObject())
                .transform(
                    new Function<PartitionChunk<ReferenceCountingSegment>, QueryRunner<T>>()
                    {
                      @Override
                      public QueryRunner<T> apply(PartitionChunk<ReferenceCountingSegment> input)
                      {
                        // 装饰器模式构造一条具体查询QueryRunner
                        return buildAndDecorateQueryRunner(
                            factory,
                            toolChest,
                            input.getObject(),
                            new SegmentDescriptor(
                                holder.getInterval(),
                                holder.getVersion(),
                                input.getChunkNumber()
                            ),
                            cpuTimeAccumulator
                        );
                      }
                    }
                );
          }
        }
    );
 
return CPUTimeMetricQueryRunner.safeBuild(
    new FinalizeResultsQueryRunner<T>(
        // 使用exec, queryRunners两个参数构造QueryRunner,在这个QueryRunner中多线程并发执行上步构造的一个个QueryRunner
        // toolChest对返回的结果聚合
        toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
        toolChest
    ),
    toolChest,
    emitter,
    cpuTimeAccumulator,
    true
);

factory.mergeRunners(exec,queryRunners)默认将执行到GroupByMergingQueryRunnerV2。在GroupByMergingQueryRunnerV2中完成关键的并大请求,通过RowBasaedGrouperHelper对每个请求结果排序,聚合,生成迭代器构造Sequence。

return new BaseSequence<>(
    new BaseSequence.IteratorMaker<Row, CloseableGrouperIterator<RowBasedKey, Row>>()
    {
      @Override
      public CloseableGrouperIterator<RowBasedKey, Row> make()
      {
        final List<ReferenceCountingResourceHolder> resources = Lists.newArrayList();
 
        try {
          // 堆外内存不足时,使用磁盘文件保存聚合的中间结果
          final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
              temporaryStorageDirectory,
              querySpecificConfig.getMaxOnDiskStorage()
          );
          final ReferenceCountingResourceHolder<LimitedTemporaryStorage> temporaryStorageHolder =
              ReferenceCountingResourceHolder.fromCloseable(temporaryStorage);
          resources.add(temporaryStorageHolder);
 
          final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
          try {
            // 节点中配置了merge的堆外内存数量,druid.processing.numMergeBuffers,使用DefaultBlockingPool对堆外内存数组资源进行管理
            // This will potentially block if there are no merge buffers left in the pool.
            if (hasTimeout) {
              final long timeout = timeoutAt - System.currentTimeMillis();
              if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
                throw new TimeoutException();
              }
            } else {
              mergeBufferHolder = mergeBufferPool.take();
            }
            resources.add(mergeBufferHolder);
          }
          catch (Exception e) {
            throw new QueryInterruptedException(e);
          }
 
          // 执行排序、聚合、创建迭代器的工具类,grouper是具体执行类
          Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
              query,
              false,
              null,
              config,
              Suppliers.ofInstance(mergeBufferHolder.get()),
              combineBufferSupplier,
              concurrencyHint,
              temporaryStorage,
              spillMapper,
              combiningAggregatorFactories,
              exec,
              priority,
              hasTimeout,
              timeoutAt,
              mergeBufferSize
          );
          final Grouper<RowBasedKey> grouper = pair.lhs;
          final Accumulator<AggregateResult, Row> accumulator = pair.rhs;
          grouper.init();
 
          final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder =
              ReferenceCountingResourceHolder.fromCloseable(grouper);
          resources.add(grouperHolder);
 
          ListenableFuture<List<AggregateResult>> futures = Futures.allAsList(
              Lists.newArrayList(
                  Iterables.transform(
                      queryables,
                      new Function<QueryRunner<Row>, ListenableFuture<AggregateResult>>()
                      {
                        @Override
                        public ListenableFuture<AggregateResult> apply(final QueryRunner<Row> input)
                        {
                          if (input == null) {
                            throw new ISE(
                                "Null queryRunner! Looks to be some segment unmapping action happening"
                            );
                          }
 
                          ListenableFuture<AggregateResult> future = exec.submit(
                              new AbstractPrioritizedCallable<AggregateResult>(priority)
                              {
                                @Override
                                public AggregateResult call() throws Exception
                                {
                                  try (
                                      Releaser bufferReleaser = mergeBufferHolder.increment();
                                      Releaser grouperReleaser = grouperHolder.increment()
                                  ) {
                                    // 每个查询返回的Sequence执行累加操作accumulate,这个操作时遍历Sequence中的元素,执行accumulator里面的逻辑,
                                    // accumulator里面实际就是根据纬度值聚合指标,将数据保存在哈希表中,哈希表存储在上面分配的堆外内存中
                                    final AggregateResult retVal = input.run(queryPlusForRunners, responseContext)
                                                                        .accumulate(
                                                                            AggregateResult.ok(),
                                                                            accumulator
                                                                        );
 
                                    // Return true if OK, false if resources were exhausted.
                                    return retVal;
                                  }
                                  catch (QueryInterruptedException e) {
                                    throw e;
                                  }
                                  catch (Exception e) {
                                    log.error(e, "Exception with one of the sequences!");
                                    throw Throwables.propagate(e);
                                  }
                                }
                              }
                          );
 
                          if (isSingleThreaded) {
                            waitForFutureCompletion(
                                query,
                                Futures.allAsList(ImmutableList.of(future)),
                                hasTimeout,
                                timeoutAt - System.currentTimeMillis()
                            );
                          }
 
                          return future;
                        }
                      }
                  )
              )
          );
          // 等待所有查询完成
          if (!isSingleThreaded) {
            waitForFutureCompletion(query, futures, hasTimeout, timeoutAt - System.currentTimeMillis());
          }
          // 这一步执行排序,构造迭代器:其中有两步排序,对每个查询结果的排序,对所有查询结果构造迭代器时的K路归并排序
          return RowBasedGrouperHelper.makeGrouperIterator(
              grouper,
              query,
              new Closeable()
              {
                @Override
                public void close() throws IOException
                {
                  for (Closeable closeable : Lists.reverse(resources)) {
                    CloseQuietly.close(closeable);
                  }
                }
              }
          );
        }
        catch (Throwable e) {
          // Exception caught while setting up the iterator; release resources.
          for (Closeable closeable : Lists.reverse(resources)) {
            CloseQuietly.close(closeable);
          }
          throw e;
        }
      }
 
      @Override
      public void cleanup(CloseableGrouperIterator<RowBasedKey, Row> iterFromMake)
      {
        iterFromMake.close();
      }
    }
);

线程池完成并发请求
每个节点配置一个待优先级的固定线程池,分配优先数量的线程。每个线程分配一个存储中间结果堆外内存数组,历史节点接收的查询请求,(可能对应多个执行线程 druid.processing.numMergerBuffers)一般比提供Jetty的http数量少(druid.server.http.numThreads)
排序、聚会、生成迭代器过程中的关键类:

  • AbstractBufferHashGrouper抽象类,主要完成相同维度值的数据的聚合操作。这个类调用各种聚合的实现方法。
  • BufferHashGrouper集成自AbstractBufferHashGrouper,将分配的堆外内存分成两份,一份存放哈希表,一份存档数据行在哈希表中的偏移offset,对数据行排序生成迭代器。
  • SpillingGrouper 创建BufferHashGrouper,执行聚合操作,(AbstractBufferHashGrouper.aggreate()),在堆外内存不足时,将结果溢写到磁盘;在生成迭代器的过程中,对堆外内存中数据、磁盘中的数据执行K路合并。
  • ConcurrentGrouper,根据维度值拆分的堆外内存数组,根据维度的哈希值获取SpillingGrouper完成聚合,构造迭代器时,调用各个spillingGrouper完成迭代,然后K路合并。新版本中,此处优化,各个spillingGrouper迭代在线程池中。
  • ParallerCombiner 在公用的执行查询轻轻的固定线程池执行K路合并。并行的聚合,会将数据写到磁盘。


    并发流程.png

    分析具体的代码,说明重分片查询结果的聚合到迭代器的生成。

// key是维度值,keyHash是纬度值计算的哈希值
public AggregateResult aggregate(KeyType key, int keyHash)
{
  // 维度值序列化后的字节数组,方便与哈希表中的值比较
  final ByteBuffer keyBuffer = keySerde.toByteBuffer(key);
  if (keyBuffer == null) {
    // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will
    // be correct.
    return Groupers.DICTIONARY_FULL;
  }
 
  if (keyBuffer.remaining() != keySize) {
    throw new IAE(
        "keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!",
        keyBuffer.remaining(),
        keySize
    );
  }
  // 计算数据行在哈希表中的位置
  // find and try to expand if table is full and find again
  int bucket = hashTable.findBucketWithAutoGrowth(keyBuffer, keyHash);
  if (bucket < 0) {
    // This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will
    // be correct.
    return Groupers.HASH_TABLE_FULL;
  }
  // 数据行在哈希表中的偏移量
  final int bucketStartOffset = hashTable.getOffsetForBucket(bucket);
  final boolean bucketWasUsed = hashTable.isBucketUsed(bucket);
  // 哈希表
  final ByteBuffer tableBuffer = hashTable.getTableBuffer();
 
  // Set up key and initialize the aggs if this is a new bucket.
  if (!bucketWasUsed) {
    // 纬度值放入哈希表
    hashTable.initializeNewBucketKey(bucket, keyBuffer, keyHash);
    for (int i = 0; i < aggregators.length; i++) {
      // 聚合器初始化,如LongSumBufferAggregator执行buf.putLong(position, 0L)
      aggregators[i].init(tableBuffer, bucketStartOffset + aggregatorOffsets[i]);
    }
 
    newBucketHook(bucketStartOffset);
  }
 
  if (canSkipAggregate(bucketWasUsed, bucketStartOffset)) {
    return AggregateResult.ok();
  }
 
  // Aggregate the current row.
  for (int i = 0; i < aggregators.length; i++) {
    // 聚合
    aggregators[i].aggregate(tableBuffer, bucketStartOffset + aggregatorOffsets[i]);
  }
 
  afterAggregateHook(bucketStartOffset);
 
  return AggregateResult.ok();
}

分配的堆外内存数组在累加的过程中,哈希表大小可能不足,需要将当前的数组中数据序列化到磁盘中。

// key是维度值,keyHash是纬度值计算的哈希值
public AggregateResult aggregate(KeyType key, int keyHash)
{
  // 上步的AbstractBufferHashGrouper累加操作
  final AggregateResult result = grouper.aggregate(key, keyHash);
 
  if (result.isOk() || !spillingAllowed || temporaryStorage.maxSize() <= 0) {
    return result;
  } else {
    // 哈希表不够用
    // Warning: this can potentially block up a processing thread for a while.
    try {
      spill();
    }
    catch (TemporaryStorageFullException e) {
      return DISK_FULL;
    }
    catch (IOException e) {
      throw Throwables.propagate(e);
    }
 
    // Try again.
    return grouper.aggregate(key, keyHash);
  }
}
private void spill() throws IOException
{
  // BufferHashGrouper构造迭代器
  try (CloseableIterator<Entry<KeyType>> iterator = grouper.iterator(true)) {
    // 迭代器中的数据一行一行序列化压缩写入到文件,后面读的时候,也可以一行一行取出数据
    files.add(spill(iterator));
    dictionaryFiles.add(spill(keySerde.getDictionary().iterator()));
    // 重置哈希表
    grouper.reset();
  }
}

druid的设计巧妙之处:利用有限堆外内存完成巨量的聚合操作,以及后续的数据读取。和mapreduce类似的原理。
BufferHashGrouper在构造迭代器过程中首先完成了哈希表的排序

/ wrappedOffsets 保存元素在哈希表中的偏移量
final List<Integer> wrappedOffsets = new AbstractList<Integer>()
{
  @Override
  public Integer get(int index)
  {
    return offsetList.get(index);
  }
 
  @Override
  public Integer set(int index, Integer element)
  {
    final Integer oldValue = get(index);
    offsetList.set(index, element);
    return oldValue;
  }
 
  @Override
  public int size()
  {
    return hashTable.getSize();
  }
};
 
final BufferComparator comparator;
if (useDefaultSorting) {
  comparator = keySerde.bufferComparator();
} else {
  comparator = keySerde.bufferComparatorWithAggregators(aggregatorFactories, aggregatorOffsets);
}
 
// Sort offsets in-place.
Collections.sort(
    wrappedOffsets,
    new Comparator<Integer>()
    {
      @Override
      public int compare(Integer lhs, Integer rhs)
      {
        // lhs,rhs是堆外内存数组中保存的记录数据行在哈希表中的偏移量,根据偏移量就能找到对应的数据行 就是排序数据的索引,数据不移动
        final ByteBuffer tableBuffer = hashTable.getTableBuffer();
        return comparator.compare(
            tableBuffer,
            tableBuffer,
            lhs + HASH_SIZE,
            rhs + HASH_SIZE
        );
      }
    }
);
 
return new CloseableIterator<Entry<KeyType>>()
{
  int curr = 0;
  final int size = getSize();
 
  @Override
  public boolean hasNext()
  {
    return curr < size;
  }
 
  @Override
  public Entry<KeyType> next()
  {
    if (curr >= size) {
      throw new NoSuchElementException();
    }
    return bucketEntryForOffset(wrappedOffsets.get(curr++));
  }
 
  @Override
  public void remove()
  {
    throw new UnsupportedOperationException();
  }
 
  @Override
  public void close() throws IOException
  {
    // do nothing
  }
};

druid巧妙的通过对偏移量的排序完成了对哈希表的排序。
SpillingGrouper对上一步每个分片产生的多个迭代器执行K路合并排序,没有真的在这里排序,采用sequence的模式,获取迭代器的元素时,执行K路合并排序的逻辑。lazy模式执行。

public CloseableIterator<Entry<KeyType>> iterator(final boolean sorted)
{
  final List<CloseableIterator<Entry<KeyType>>> iterators = new ArrayList<>(1 + files.size());
 
  iterators.add(grouper.iterator(sorted));
 
  final Closer closer = Closer.create();
  // 磁盘中的文件,保存有排序好的数据行
  for (final File file : files) {
    final MappingIterator<Entry<KeyType>> fileIterator = read(file, keySerde.keyClazz());
    iterators.add(
        CloseableIterators.withEmptyBaggage(
            Iterators.transform(
                fileIterator,
                new Function<Entry<KeyType>, Entry<KeyType>>()
                {
                  @Override
                  public Entry<KeyType> apply(Entry<KeyType> entry)
                  {
                    final Object[] deserializedValues = new Object[entry.getValues().length];
                    for (int i = 0; i < deserializedValues.length; i++) {
                      deserializedValues[i] = aggregatorFactories[i].deserialize(entry.getValues()[i]);
                      if (deserializedValues[i] instanceof Integer) {
                        // Hack to satisfy the groupBy unit tests; perhaps we could do better by adjusting Jackson config.
                        deserializedValues[i] = ((Integer) deserializedValues[i]).longValue();
                      }
                    }
                    return new Entry<>(entry.getKey(), deserializedValues);
                  }
                }
            )
        )
    );
    closer.register(fileIterator);
  }
 
  final Iterator<Entry<KeyType>> baseIterator;
  // 调用guava的K路合并排序接口Iterators.mergeSorted(iterators, comparator)
  if (sortHasNonGroupingFields) {
    baseIterator = CloseableIterators.mergeSorted(iterators, defaultOrderKeyObjComparator);
  } else {
    baseIterator = sorted ?
                   CloseableIterators.mergeSorted(iterators, keyObjComparator) :
                   CloseableIterators.concat(iterators);
  }
 
  return CloseableIterators.wrap(baseIterator, closer);
}

ConcurrentGrouper根据处理线程数拆分堆外内存数组,构造过个spillingGrouper,聚合操作中,根据维度哈希值选择对应spillingGrouper完成聚合操作;迭代器构造时,将每个分片构造的迭代器再次执行K路合并。

// key是维度值,keyHash是纬度值计算的哈希值
public AggregateResult aggregate(KeyType key, int keyHash)
{
  if (!initialized) {
    throw new ISE("Grouper is not initialized");
  }
 
  if (closed) {
    throw new ISE("Grouper is closed");
  }
 
  if (!spilling) {
    // 根据维度哈希值选择对应的SpillingGrouper
    final SpillingGrouper<KeyType> hashBasedGrouper = groupers.get(grouperNumberForKeyHash(keyHash));
 
    synchronized (hashBasedGrouper) {
      if (!spilling) {
        // SpillingGrouper执行累加操作 堆外内存数组不足时不会spill到文件,因为这里spilling==false
        if (hashBasedGrouper.aggregate(key, keyHash).isOk()) {
          return AggregateResult.ok();
        } else {
          spilling = true;
        }
      }
    }
  }
 
  // At this point we know spilling = true
  final SpillingGrouper<KeyType> tlGrouper = threadLocalGrouper.get();
 
  synchronized (tlGrouper) {
    tlGrouper.setSpillingAllowed(true);
    return tlGrouper.aggregate(key, keyHash);
  }
}
public CloseableIterator<Entry<KeyType>> iterator(final boolean sorted)
{
  if (!initialized) {
    throw new ISE("Grouper is not initialized");
  }
 
  if (closed) {
    throw new ISE("Grouper is closed");
  }
  // 排序的话,会执行parallelSortAndGetGroupersIterator()方法,这里会往公用的固定线程池提交任务,执行上一步SpillingGrouper构造迭代器的逻辑,等待所有任务执行完成
  final List<CloseableIterator<Entry<KeyType>>> sortedIterators = sorted && isParallelizable() ?
                                                                  parallelSortAndGetGroupersIterator() :
                                                                  getGroupersIterator(sorted);
 
  // Parallel combine is used only when data is spilled. This is because ConcurrentGrouper uses two different modes
  // depending on data is spilled or not. If data is not spilled, all inputs are completely aggregated and no more
  // aggregation is required.
  if (sorted && spilling && parallelCombiner != null) {
    // First try to merge dictionaries generated by all underlying groupers. If it is merged successfully, the same
    // merged dictionary is used for all combining threads
    final List<String> dictionary = tryMergeDictionary();
    if (dictionary != null) {
      // 多线程执行K路合并排序
      return parallelCombiner.combine(sortedIterators, dictionary);
    }
  }
  // 单线程执行K路合并排序
  return sorted ?
         CloseableIterators.mergeSorted(sortedIterators, keyObjComparator) :
         CloseableIterators.concat(sortedIterators);
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,639评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,277评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,221评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,474评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,570评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,816评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,957评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,718评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,176评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,511评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,646评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,322评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,934评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,755评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,987评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,358评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,514评论 2 348

推荐阅读更多精彩内容