结合源码了解Kafka生产者

1 前言

这里是基于已经看过或者本身已经掌握了 一文带你快速入门kafka 知识点为前提,继续往下写的。另外,这里涉及到的源码分析,笔者所分析的版本如下

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

<dependencies>
    <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
</dependencies>
kafka-client版本.png

2 springboot & Kafka

接下来进入主题。在 一文带你快速入门kafka 中,笔者搭建的是 springboot 项目,我们知道 springboot 是通过自动装配给我们直接使用 kafka 的,所以我们先来看下自动装配这块的代码。
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

@AutoConfiguration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
    
    // 省略代码
    
    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener,
            ObjectProvider<RecordMessageConverter> messageConverter) {
        PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
        // 这里被 new 了出来,是用了 DefaultKafkaProducerFactory
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
        map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
        map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);
        return kafkaTemplate;
    }
    
    @Bean
    @ConditionalOnMissingBean(ProducerFactory.class)
    public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(
            ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
        DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
                this.properties.buildProducerProperties());
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
        return factory;
    }
    
    // 省略代码
}

知道 KafkaTemplate 是在哪里构建后,再看下发送消息的方法(org.springframework.kafka.core.KafkaTemplate#send)

public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
        ApplicationListener<ContextStoppedEvent>, DisposableBean {
     // 省略代码
     @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
        return doSend(producerRecord);
    } 
            
    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        // 这里拿到了Producer对象
        final Producer<K, V> producer = getTheProducer(producerRecord.topic());
        this.logger.trace(() -> "Sending: " + KafkaUtils.format(producerRecord));
        final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
        Object sample = null;
        if (this.micrometerEnabled && this.micrometerHolder == null) {
            this.micrometerHolder = obtainMicrometerHolder();
        }
        if (this.micrometerHolder != null) {
            sample = this.micrometerHolder.start();
        }
        Future<RecordMetadata> sendFuture =
                producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
        // May be an immediate failure
        if (sendFuture.isDone()) {
            try {
                sendFuture.get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new KafkaException("Interrupted", e);
            }
            catch (ExecutionException e) {
                throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace
            }
        }
        if (this.autoFlush) {
            flush();
        }
        this.logger.trace(() -> "Sent: " + KafkaUtils.format(producerRecord));
        return future;
    }
    
    protected Producer<K, V> getTheProducer(@SuppressWarnings("unused") @Nullable String topic) {
        boolean transactionalProducer = this.transactional;
        if (transactionalProducer) {
            boolean inTransaction = inTransaction();
            Assert.state(this.allowNonTransactional || inTransaction,
                    "No transaction is in process; "
                        + "possible solutions: run the template operation within the scope of a "
                        + "template.executeInTransaction() operation, start a transaction with @Transactional "
                        + "before invoking the template method, "
                        + "run in a transaction started by a listener container when consuming a record");
            if (!inTransaction) {
                transactionalProducer = false;
            }
        }
        if (transactionalProducer) {
            Producer<K, V> producer = this.producers.get();
            if (producer != null) {
                return producer;
            }
            KafkaResourceHolder<K, V> holder = ProducerFactoryUtils
                    .getTransactionalResourceHolder(this.producerFactory, this.transactionIdPrefix, this.closeTimeout);
            return holder.getProducer();
        }
        else if (this.allowNonTransactional) {
            return this.producerFactory.createNonTransactionalProducer();
        }
        else if (topic == null) {
            return this.producerFactory.createProducer();
        }
        else {
            // 其实走的是这里生成 Producer 实例的,而这个 ProducerFactory 就是用了DefaultKafkaProducerFactory
            return getProducerFactory(topic).createProducer();
        }
    }
            
    // 省略代码
}

org.springframework.kafka.core.DefaultKafkaProducerFactory#createProducer()

public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
        implements ProducerFactory<K, V>, ApplicationContextAware,
        BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean {
            
            // 省略代码
            
            @Override
    public Producer<K, V> createProducer() {
        return createProducer(this.transactionIdPrefix);
    }
            
            @Override
    public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
        String txIdPrefix = txIdPrefixArg == null ? this.transactionIdPrefix : txIdPrefixArg;
        // 在《一文带你快速入门kafka》的示例代码中,txIdPrefix 为 null
        return doCreateProducer(txIdPrefix);
    }
            
   private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
        synchronized (this) {
            if (this.producer != null && expire(this.producer)) {
                this.producer = null;
            }
            if (this.producer == null) {
                // 在《一文带你快速入门kafka》的示例代码中,会走到这里来
                this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
                        this.physicalCloseTimeout, this.beanName, this.epoch.get());
                this.listeners.forEach(listener -> listener.producerAdded(this.producer.clientId, this.producer));
            }
            return this.producer;
        }
    }
            // 省略代码
}

CloseSafeProducer 的类图如下


CloseSafeProducer UML图.png

可以看出 spring 自己定义了一个 CloseSafeProducer 的类管理起来,并提供 Producer 的方法给我们使用。

另外,还需要关注下 ProducerRecord

ProducerRecord.png

org.apache.kafka.clients.producer.ProducerRecord

public class ProducerRecord<K, V> {
    // 主题
    private final String topic;
    // 分区
    private final Integer partition;
    // 消息头
    private final Headers headers;
    // 我们的消息可以根据topic归类,而这个key可以对消息进行二次归类,同一个key的消息会放在同一个partition,有key的消息还能进行日志压缩
    private final K key;
    // 我们发送的消息,注意不要为空,因为为空表示特定的消息——墓碑消息
    private final V value;
    // 消息的时间戳
    private final Long timestamp;
    // 省略代码
}

从这里很容易知道,我们发送时不仅仅可以指定主题,还可以指定分区等信息。

必要参数

必要参数是指要创建生产者实例所必需用到的参数,这里指的是 kafka-client 原生态写法所需要的参数

  • key.serializer

    对 key 进行序列化的类,可以自己实现

  • value.serializer

    对 value 进行序列化的类,可以自己实现

spring:
  kafka:
    # 必填,不填使用默认值 localhost:9092 连接 kafka 集群的地址,多个用逗号隔开;单节点就填一个就好
    bootstrap-servers: "192.168.226.140:9092"
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

我这里这样配置是为了给 spring 自动装配,但是 spring 这些配置无非也就是给 kafka-client 使用,如果不怕麻烦完全可以自己去集成 kafka-client 然后创建 Producer 使用的。

消息的发送

我们看发送消息的源码时曾看到这么一段源码,看下图


kafkaTemplate#Send.png

ProducerRecord 是 kafka-client 对消息元数据封装的类,看上图,这里构建时就用了 topic 和 我们发送的消息数据,其它默认为空,请看下图


ProducerRecord构造方法.png

在实际开发过程中,有时我们需要更多信息,比如:指定 partition,指定 key等等。甚至,可能需要给消息增加一些有特殊业务含义的信息,但是又不能存放在我们的业务消息上(即 value),这时我们就需要使用 headers 了。
Producer#send.png

上图大家可以看到 Producer 发送消息的方法只有两个,由于 Future 本身就是支持异步和同步的,所以这里我们可以根据自己实际情况选择。另外,如果想异步,并且获取发送完成后做一些业务处理,可以用第二个方法,可以使用 Callback。

生产者客户端的整体架构

前面给出了 spring 集成 kafka-licent 的源码,知道了 spring 是如何简化我们对 kafka-client 的使用过程。接下来我们来看下生产者客户端与服务端之间的交互。下图来自《深入理解Kafka:核心设计与实践原理》


生产者客户端的整体架构.png

从这个架构图我们可以清晰看到,Producer 发送消息到 kafka服务端前要经历许多元件:

  • 拦截器

    Kafka 拦截器有两种:生产者拦截器和消费者拦截器。这里指的是生产者拦截器,可以根据自己业务需要在消息发送前,对消息进行业务处理。比如:统计。

  • 序列化器

    对 key 和 value 进行序列化使用的。

  • 分区器

    当我们发送消息时有明确指定发送的分区,那么用不上分区器。当我们没有指定时,就是靠分区器帮我们计算当前消息应该发往哪个分区的。

  • 消息累加器(或者叫消息收集器)

    Producer 发送消息不是一下就发送到了 Kafka 服务端的,而是会先放在我们本地内存缓存 —— 消息累加器。它会为每个分区创建一个双端队列,而里面的 ProducerBatch 是消息批次,多个 ProducerRecord 会放在一个 ProducerBatch 中一起发送,从而减少网络IO。缓存大小可以通过 buffer.memory 配置。

    buffer.memory 这个配置需要注意,它默认是 32 MB,如果发送速度太快,就会造成缓存区域被塞满。这时就会造成我们调用 send 方法时要么被阻塞,要么抛异常,这个主要需要看我们配置的阻塞时间 max.block.ms,默认 60 秒

    这里面其实还有一个 BufferPool,在上图没有体现出来的。

    • BufferPool

      网络发送数据都是一个一个数据包发送的,我们发送时反复创建和释放存放内存的空间,在 Kafka 的应用场景中就是很大的开销了,所以 Kafka 就搞了一个 BufferPool,它会对特定大小的 ByteBuffer 进行复用。这个特定大小可以通过 batch.size 指定,默认 16 KB。

  • Sender

    • 从图就可以看出,它会从消息累加器中拉取 ProducerBatch,并发送给 Kafka 服务端。只不过在发送前,Sender 会把 Deque<ProducerBatch> 转成 <Node, List<ProducerBatch>> → <Node, Request>,Node 表示这个消息批次需要发往的 broker,而 Request 是因为发送数据包不仅仅是只有消息数据,还要包含 Kafka 通信的协议

    • 除此之外,它还有一个责任,就是更新元数据信息。举个列子,请看下图(图来自《深入理解Kafka:核心设计与实践原理》)

      判定leastLoadedNode.png

      这里明显可以看到 node1 的压力是最小的(这时 node1 就是 leastLoadedNode),在需要快速消费消息,充分利用资源的情况下,发送给 node1 是最为合理的,那么 Sender 怎么知道 Node1 压力是最小的呢?这就需要维护元数据信息了!一些元数据变动,客户端能感知到,如 broker 地址变化。这些需要使用到的元数据的变动是可以感知的,但是那些不需要使用的呢?配置 metadata.max.age.ms,默认 5 分钟,超过5分钟,没有更新的元数据都会触发元数据更新。这个动作是由 Sender 负责的,它会向 leastLoadedNode 发出请求。

生产者拦截器

拦截器(Interceptor)是早在 Kafka 0.10.0.0 中就引入的一个功能,Kafka 一共有两种拦截器:生产者拦截器和消费者拦截器

要实现生产者拦截器十分的简单,只要实现 org.apache.kafka.clients.producer.ProducerInterceptor 即可。该接口只有三个抽象方法需要实现:

  • onSend

    这个方法会在 KafkaProducer.send(ProducerRecord) 和KafkaProducer.send(ProducerRecord, Callback) 调用时触发,onSend 在 key 和 value 被序列化和计算 partition 之前。

  • onAcknowledgement

    这个方法会在消息被应答失败时被调用,它优先执行于 Callback。

  • close

    主要用于关闭拦截器在执行时所用到的一些需要关闭或清理的资源。

注意:这三个方法抛出的异常均会被捕获并记录到日志中,并不会再向上传递。

分区器

分区器主要是在我们没有指定 partition 时,给我们计算当前消息应该发往哪个 partition

KafkaProducer.png
public class KafkaProducer<K, V> implements Producer<K, V> {
    // 省略代码
    
    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            throwIfProducerClosed();
            // first make sure the metadata for the topic is available
            long nowMs = time.milliseconds();
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }
            nowMs += clusterAndWaitTime.waitedOnMetadataMs;
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            try {
                // 序列化器对key序列化
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
                // 序列化器对value序列化
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
            // 计算 partition 的方法
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            
            // 省略代码
    }
      
    // 分区器
    private final Partitioner partitioner;
        
    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }
    
    // 省略代码
}

Kafka 中默认的分区器是:org.apache.kafka.clients.producer.internals.DefaultPartitioner

DefaultPartitioner.png

DefaultPartitioner#partition.png

对于 org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition
当 key 不为 null 时,默认分区器会对 key 进行哈希,根据哈希值来计算分区号,因此拥有相同 key 的消息会落到同一分区;如果 key 为 null,那么消息将会以轮询的方式发送给该消息对应的 topic 的可用的 partition。

序列化器

对 key 和 value 进行序列化,转成字节数组才能通过网络发送给 Kafka 服务端

spring-kafka 默认采用的是 org.apache.kafka.common.serialization.StringSerializer。

如果想自定义,就要实现 org.apache.kafka.common.serialization.Serializer 接口

public interface Serializer<T> extends Closeable {

    /**
     * 配置当前类
     * @param configs configs 存放了配置信息
     * @param isKey true-是key false-value
     */
    default void configure(Map<String, ?> configs, boolean isKey) {
        // configs 中有3个参数用来配置序列化的编码类型:key.serializer.encoding、value.serializer.encoding 和 serializer.encoding
    }

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param data typed data
     * @return serialized bytes
     */
    byte[] serialize(String topic, T data);

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param headers headers associated with the record
     * @param data typed data
     * @return serialized bytes
     */
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }

    /**
     * Close this serializer.
     * <p>
     * 该方法必须是幂等的,因为它可能会被多次调用。
     */
    @Override
    default void close() {
        // intentionally left blank
    }
}

重要的参数配置

温馨提示:kafka 大部分参数都有默认值并且较为合理,如果不是特别熟悉,请不要擅自改动。

acks

这个参数用来指定分区中必须要有多少个副本收到这条消息后,生产者才会认为消息写入成功

  • acks=1

默认值是1,只要消息写入 leader,服务端就会返回成功的 ack 给生产者,生产者才认为投递成功

  • acks=0

    生产者只要发送成功,无需等待服务端的响应,直接认为成功

  • acks=-1 或 acks=all

    需要所有 ISR 均成功写入消息,服务端才会返回成功的 ack 给生产者,生产者才认为投递成功。需要注意的是,当 ISR 中只有 leader 这种情况,那么将会退化成 acks=1。因此,还需要配合 min.insync.replicas=n (这个参数设置了 ISR 副本数量最少要有多少个)使用。

max.request.size

这个参数是用来限制生产者能发送的消息的最大值,默认值是 1 MB

这里需要知道的是,客户端有客户端的配置,服务端也有服务端的配置,这个当客户端允许的最大值超过了服务端的 message.max.bytes 就会抛异常。除了上面这个例子以外,还有一些其它的配置和 max.request.size 也有联动,所以最好不要胡乱改。

retries 和 retry.backoff.ms

retries 用来配置生产者重试投递的次数,默认为 0。

retry.backoff.ms 用来配置重试之间的时间间隔,默认 100 ms。

compression.type

这个参数用来指定消息的压缩方式,默认为 “none”,即默认不压缩消息。可配置类型:

  • none
  • gzip
  • snappy
  • lz4

connections.max.idle.ms

这个参数用来指定在多久之后关闭闲置的连接,默认值是 540000 ms,即 9 分钟。

linger.ms

这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入 ProducerBatch 的时间,默认为 0。生产者会在 ProducerBatch 被填满或者等待时间超过 linger.ms 值时发送出去。这个参数与 TCP 协议中的 Nagle 算法有异曲同工之妙。

receive.buffer.bytes

这个参数用来设置 Socket 接收消息缓冲区(SO_RECEBUF)的大小,默认为 32 KB。如果设置为 -1,则使用操作系统的默认值。

send.buffer.bytes

这个参数用来设置 Socket 发送消息缓冲区(SO_RECEBUF)的大小,默认为 128 KB。如果设置为 -1,则使用操作系统的默认值。

request.timeout.ms

还有 0% 的精彩内容
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
支付 ¥1.00 继续阅读
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355

推荐阅读更多精彩内容