RxJava源码学习

个人主页:https://chengang.plus/

文章将会同步到个人微信公众号:Android部落格

1 RxJava使用

RxJava是响应式数据流驱动框架,Retrokit提供了对RxJava的支持。

1.1 接入

在app module的build.gradle中添加依赖:

implementation "com.squareup.retrofit2:adapter-rxjava2:2.3.0"
implementation "io.reactivex.rxjava2:rxjava:2.0.6"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

接下来在Retrofit构建的时候添加RxJava CallFactory:

Retrofit retrofit = new Retrofit.Builder()
        .baseUrl("https://api.github.com/")
        .callbackExecutor(Executors.newSingleThreadExecutor())
        .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
        .addConverterFactory(GsonConverterFactory.create())
        .build();

CallAdapter.Factory是典型的工厂模式,这个类的一般实现模式是:

public interface CallAdapter<R, T> {
    Type responseType();
    T adapt(Call<R> call);
    abstract class Factory {
        public abstract @Nullable CallAdapter<?, ?> get(
        Type returnType, Annotation[] annotations, Retrofit retrofit);
    }
}

先调用get方法返回CallAdapter,然后调用adapt方法获取请求Call执行请求。

在正式添加RxJava代码之前需要先看看RxJava的源码。

可以从官方的文档中获取相关知识。

http://reactivex.io/RxJava/3.x/javadoc/overview-summary.html

2 基本类

2.1 io.reactivex.rxjava3.core.Flowable

0..N flows, supporting Reactive-Streams and backpressure

0到多个流,支持响应流和背压。

http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html

The Flowable class that implements the Reactive Streams Publisher Pattern and offers factory methods, intermediate operators and the ability to consume reactive dataflows.

Flowable类实现了响应流发布模式,并提供工厂方法,中间运算符实现,以及有能力消费响应数据流。

他实现了Publisher接口:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    /**
     * Invoked after calling {@link Publisher#subscribe(Subscriber)}.
     * <p>
     * No data will start flowing until {@link Subscription#request(long)} is invoked.
     * <p>
     * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
     * <p>
     * The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
     * 
     * @param s
     *            {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
     */
    public void onSubscribe(Subscription s);

    /**
     * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
     * 
     * @param t the element signaled
     */
    public void onNext(T t);

    /**
     * Failed terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
     *
     * @param t the throwable signaled
     */
    public void onError(Throwable t);

    /**
     * Successful terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
     */
    public void onComplete();
}

可以认为Flowable是一个发布者,数据流从这里开始发起请求。

subscribe是一个工厂方法,可以被调用多次,每次调用都会产生一次订阅,每次订阅对应一个Subscriber(订阅者)。每个订阅者每次只能订阅一个发布者,就是Publisher的各种实现类。

如果发布者拒绝订阅或因为其他原因导致失败,会回调一个错误信号给订阅者(Subscriber.onError)。

图片来自http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html

image

2.2 io.reactivex.rxjava3.core.Observable

0..N flows, no backpressure。

0到多个数据流,没有背压。

The Observable class is the non-backpressured, optionally multi-valued base reactive class that offers factory methods, intermediate operators and the ability to consume synchronous and/or asynchronous reactive dataflows.

Many operators in the class accept ObservableSource(s), the base reactive interface for such non-backpressured flows, which Observable itself implements as well.

Observable类是没有背压,可选多值的响应类,这个类能够提供工厂方法,中间运算符实现以及有能力消费同步和/或异步响应数据流。

该类中的许多运算符都实现ObservableSource(s),它是此类非背压流的基本响应接口,Observable本身也实现了该接口。

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(@NonNull Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}

图片来自http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html

image

2.3 io.reactivex.rxjava3.core.Single

a flow of exactly 1 item or an error。

一个数据流正好对应一个item返回或一个错误回调。

The Single class implements the Reactive Pattern for a single value response.

Single behaves similarly to Observable except that it can only emit either a single successful value or an error (there is no onComplete notification as there is for an Observable).

The Single class implements the SingleSource base interface and the default consumer type it interacts with is the SingleObserver via the subscribe(SingleObserver) method.

Single类实现了单个数据返回的响应模式。

Single类的行为类似Observable,除了他只能发送一个成功的数据或一个错误(也没有一个onComplete回调通知,这在Observable中是存在的)。

Single类实现了SingleSource基本接口,与之交互的默认消费者类型是SingleObserver,通过SingleObserver的subscribe方法产生关联。

public interface SingleSource<T> {

    /**
     * Subscribes the given SingleObserver to this SingleSource instance.
     * @param observer the SingleObserver, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull SingleObserver<? super T> observer);
}

public interface SingleObserver<T> {

    /**
     * Provides the SingleObserver with the means of cancelling (disposing) the
     * connection (channel) with the Single in both
     * synchronous (from within {@code onSubscribe(Disposable)} itself) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Notifies the SingleObserver with a single item and that the {@link Single} has finished sending
     * push-based notifications.
     * <p>
     * The {@link Single} will not call this method if it calls {@link #onError}.
     *
     * @param t
     *          the item emitted by the Single
     */
    void onSuccess(@NonNull T t);

    /**
     * Notifies the SingleObserver that the {@link Single} has experienced an error condition.
     * <p>
     * If the {@link Single} calls this method, it will not thereafter call {@link #onSuccess}.
     *
     * @param e
     *          the exception encountered by the Single
     */
    void onError(@NonNull Throwable e);
}

图片来自:http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Single.html

image

2.4 io.reactivex.rxjava3.core.Completable

a flow without items but only a completion or error signal。

没有数据回调返回的数据流,但是只有完成或错误回调。

The Completable class represents a deferred computation without any value but only indication for completion or exception.

Completable behaves similarly to Observable except that it can only emit either a completion or error signal (there is no onNext or onSuccess as with the other reactive types).

The Completable class implements the CompletableSource base interface and the default consumer type it interacts with is the CompletableObserver via the subscribe(CompletableObserver) method.

Completable类意味着一个延迟的没有值返回的计算,但是只有完成或异常返回回调。

Completable的表现与Observable类似,除了他只能传递一个完成或错误异常。

Completable类实现了CompletableSource基本接口,默认的消费者类型是CompletableObserver,通过CompletableObserver.subscribe方法产生交互。

public interface CompletableSource {

    /**
     * Subscribes the given {@link CompletableObserver} to this {@code CompletableSource} instance.
     * @param observer the {@code CompletableObserver}, not {@code null}
     * @throws NullPointerException if {@code observer} is {@code null}
     */
    void subscribe(@NonNull CompletableObserver observer);
}

public interface CompletableObserver {
    /**
     * Called once by the {@link Completable} to set a {@link Disposable} on this instance which
     * then can be used to cancel the subscription at any time.
     * @param d the {@code Disposable} instance to call dispose on for cancellation, not null
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Called once the deferred computation completes normally.
     */
    void onComplete();

    /**
     * Called once if the deferred computation 'throws' an exception.
     * @param e the exception, not {@code null}.
     */
    void onError(@NonNull Throwable e);
}

图片来自:http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Completable.html

[图片上传失败...(image-e5a126-1597284795024)]

2.5 io.reactivex.rxjava3.core.Maybe

a flow with no items, exactly one item or an error.

一个没有反馈项的数据流,准确的说应该是反馈一个item或错误。

The Maybe class represents a deferred computation and emission of a single value, no value at all or an exception.

The Maybe class implements the MaybeSource base interface and the default consumer type it interacts with is the MaybeObserver via the subscribe(MaybeObserver) method.

Maybe类表示单个值(完全没有值或异常)的延迟计算和发射。

Maybe类实现了基类借口MaybeSource,与之交互的默认消费者类型是MaybeObserver,通过MaybeObserver.subscribe()实现交互。

public interface MaybeSource<T> {

    /**
     * Subscribes the given MaybeObserver to this MaybeSource instance.
     * @param observer the MaybeObserver, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull MaybeObserver<? super T> observer);
}

public interface MaybeObserver<T> {

    /**
     * Provides the MaybeObserver with the means of cancelling (disposing) the
     * connection (channel) with the Maybe in both
     * synchronous (from within {@code onSubscribe(Disposable)} itself) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Notifies the MaybeObserver with one item and that the {@link Maybe} has finished sending
     * push-based notifications.
     * <p>
     * The {@link Maybe} will not call this method if it calls {@link #onError}.
     *
     * @param t
     *          the item emitted by the Maybe
     */
    void onSuccess(@NonNull T t);

    /**
     * Notifies the MaybeObserver that the {@link Maybe} has experienced an error condition.
     * <p>
     * If the {@link Maybe} calls this method, it will not thereafter call {@link #onSuccess}.
     *
     * @param e
     *          the exception encountered by the Maybe
     */
    void onError(@NonNull Throwable e);

    /**
     * Called once the deferred computation completes normally.
     */
    void onComplete();
}

图片来自http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Maybe.html

image

3 RxJava中的线程

3.1 IO

在Schedulers类中通过静态调用获取IO线程,

public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

io方法中,有一个IO静态变量,在static块中被初始化:

static {
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

    IO = RxJavaPlugins.initIoScheduler(new IOTask());

    TRAMPOLINE = TrampolineScheduler.instance();

    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}

可以看到这里不止IO线程,还有NEW_THREAD等线程,这里以IO为例说明。

Schedulers

IO = RxJavaPlugins.initIoScheduler(new IOTask());

static final class IOTask implements Supplier<Scheduler> {
    @Override
    public Scheduler get() {
        return IoHolder.DEFAULT;
    }
}

static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

IoScheduler

public final class IoScheduler extends Scheduler {}

public IoScheduler() {
    this(WORKER_THREAD_FACTORY);
}

public IoScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    this.pool = new AtomicReference<>(NONE);
    start();
}

    @Override
public void start() {
    CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}
    
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
    this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
    this.expiringWorkerQueue = new ConcurrentLinkedQueue<>();
    this.allWorkers = new CompositeDisposable();
    this.threadFactory = threadFactory;
    
    ScheduledExecutorService evictor = null;
    Future<?> task = null;
    if (unit != null) {
        evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
        task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
    }
    evictorService = evictor;
    evictorTask = task;
}

EVICTOR_THREAD_FACTORY是一个RxThreadFactory对象。

RxThreadFactory

public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
    @Override
    public Thread newThread(@NonNull Runnable r) {
        Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
        t.setPriority(priority);
        t.setDaemon(true);
        return t;
    }
    }
}

可以看到IO其实对应的是IoScheduler,中间包含了一个线程池执行对应的任务。

3.2 .mainThread

一般调用方法是:AndroidSchedulers.mainThread()。

AndroidSchedulers

public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
    
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
    new Callable<Scheduler>() {
        @Override public Scheduler call() throws Exception {
            return MainHolder.DEFAULT;
        }
    });
            
private static final class MainHolder {
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}

主线程的切换使用Handler MainLooper。

4 整体把握

Observable<String> observable = service.listRepos1("cg22983677");
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        
    }

    @Override
    public void onNext(String s) {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

4.1 subscribeOn

这两个方法创建了数据流被执行的环境。

Observable

    public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
    }

ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;

        final AtomicReference<Disposable> upstream;

        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<>();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }

        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            downstream.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
}

4.2 observeOn

Observable

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }
    
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
                SimpleQueue<T> queue;

        Disposable upstream;
        
        @Override
        public void onSubscribe(Disposable d) {
        }
        
        @Override
        public void onNext(T t) {
            queue.offer(t);
            schedule();
        }
        
        @Override
        public void onError(Throwable t) {
            done = true;
            schedule();
        }
        
        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }
        
        @Override
        public void dispose() {
            if (!disposed) {
                disposed = true;
                upstream.dispose();
                worker.dispose();
                if (!outputFused && getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }
        
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
        
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
    }
}

4.3 subscribe

一句话,在这里启动数据流。

5 流程图

Observable<String> observable = service.listRepos1("cg22983677");
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {});
image
image

6 总结

第5节中的流程图是Observable相关的,其实另外四种基础类都是一样的流程。

每个基础类中都有不同的操作符,每一个操作符就是一个Observable,意思就是可被观察的,内部又实现了观察者,用于接收上一步的数据流,处理完毕之后,将数据流往下传递,最终到了消费者。

处理数据的过程所处的线程,依赖于最初设置的Scheduler,他里面的Worker是具体实施者,并在最初设定的线程中运行。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,723评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,485评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,998评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,323评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,355评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,079评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,389评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,019评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,519评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,971评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,100评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,738评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,293评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,289评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,517评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,547评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,834评论 2 345