


// 创建被观察者、数据源
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
    public void call(Subscriber<? super String> subscriber) {
        // 可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable
        //被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用三次 onNext() 和一次
        // onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。
        Log.i("@@@", "call");
        subscriber.onNext("Hello ");
        subscriber.onNext("World !");
// 创建观察者
Subscriber<String> subscriber = new Subscriber<String>() {
    public void onCompleted() {
        Log.i("@@@", "onCompleted");

    public void onError(Throwable e) {
        Log.i("@@@", "onError");

    public void onNext(String s) {
        Log.i("@@@", "onNext : " + s);
// 关联或者叫订阅更合适。



public class Observable<T> {

    final OnSubscribe<T> onSubscribe;

     * Creates an Observable with a Function to execute when it is subscribed to.
     * <p>
     * <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor,
     * unless you specifically have a need for inheritance.
     * @param f
     *            {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;

    public static <T> Observable<T> create(OnSubscribe<T> f) {
        // 调用了上面的构造函数创建一个Observable对象,而RxJavaHooks.onCreate(f)是啥呢? 源码我们下面会看,这里我们只要知道`RxjavaHooks.onCreate(f)`的返回值还是f就行了
        return new Observable<T>(RxJavaHooks.onCreate(f));

    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        // 这里会使用到Observable中的成员变量onSubscribe
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");

        // new Subscriber so onStart it
        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            // 这里会使用到Observable中的成员变量onSubscribe
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {



 * Invoked when Observable.subscribe is called.
 * @param <T> the output value type
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
    // cover for generics insanity

 * A one-argument action.
 * @param <T> the first argument type
public interface Action1<T> extends Action {
    void call(T t);


public final class RxJavaHooks {
    static volatile Func1<Observable.OnSubscribe, Observable.OnSubscribe> onObservableCreate;

    /** Initialize with the default delegation to the original RxJavaPlugins. */
    // 哎~静态代码块
    static {

    static void init() {
    static void initCreate() {
        onObservableCreate = new Func1<Observable.OnSubscribe, Observable.OnSubscribe>() {
            public Observable.OnSubscribe call(Observable.OnSubscribe f) {
                return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);

        onSingleCreate = new Func1<rx.Single.OnSubscribe, rx.Single.OnSubscribe>() {
            public rx.Single.OnSubscribe call(rx.Single.OnSubscribe f) {
                return RxJavaPlugins.getInstance().getSingleExecutionHook().onCreate(f);

        onCompletableCreate = new Func1<Completable.OnSubscribe, Completable.OnSubscribe>() {
            public Completable.OnSubscribe call(Completable.OnSubscribe f) {
                return RxJavaPlugins.getInstance().getCompletableExecutionHook().onCreate(f);

     * Hook to call when an Observable is created.
     * @param <T> the value type
     * @param onSubscribe the original OnSubscribe logic
     * @return the original or replacement OnSubscribe instance
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
        // 这里onObservableCreate只有在调用initCreate()方法才会初始化,而initCreate()又被init()方法调用,init()在静态代码块中
        Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
        if (f != null) {
            // 所以之列返回的上面onObservableCreate创建的对OnSubscribe对象,其实这里可以简单的理解为直接返回onSubscribe参数
            return f.call(onSubscribe);
        return onSubscribe;

    public static void clear() {
        if (lockdown) {
        onError = null;

        onObservableCreate = null;


public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
    return f;


Observable.create(OnSubscribe f)方法其实就是内部new了一个被观察者Observable对象,同时将参数中所传过来的new出来的OnSubscribe对象赋值给了该Observable的成员变量onSubscribe
那一定会想知道把create(OnSubscribe f)的参数赋值给成员变量onSubscribe有什么用呢? 当然是在Observable.subscribe()unSubscribe()中会用到啊。



 * Provides a mechanism for receiving push-based notifications from Observables, and permits manual
 * unsubscribing from these Observables.
public abstract class Subscriber<T> implements Observer<T>, Subscription {

    // represents requested not set yet
    private static final long NOT_SET = Long.MIN_VALUE;
    // Subscription that represents a group of Subscriptions that are unsubscribed together.
    // 订阅事件集,所有发送给当前Subscriber的事件都会保存在这里
    private final SubscriptionList subscriptions;
    private final Subscriber<?> subscriber;
     * Interface that establishes a request-channel between an Observable and a Subscriber and allows
     * the Subscriber to request a certain amount of items from the Observable (otherwise known as
     * backpressure).
    private Producer producer;

    protected Subscriber() {
        this(null, false);

    protected Subscriber(Subscriber<?> subscriber) {
        this(subscriber, true);

     * Construct a Subscriber by using another Subscriber for backpressure and
     * optionally for holding the subscription list (if
     * <code>shareSubscriptions</code> is <code>true</code> then when
     * <code>this.add(sub)</code> is called this will in fact call
     * <code>subscriber.add(sub)</code>).
     * <p>
     * To retain the chaining of subscribers when setting
     * <code>shareSubscriptions</code> to <code>false</code>, add the created
     * instance to {@code subscriber} via {@link #add}.
     * @param subscriber
     *            the other Subscriber
     * @param shareSubscriptions
     *            {@code true} to share the subscription list in {@code subscriber} with
     *            this instance
     * @since 1.0.6
    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
        this.subscriber = subscriber;
        this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();

     * Adds a {@link Subscription} to this Subscriber's list of subscriptions if this list is not marked as
     * unsubscribed. If the list <em>is</em> marked as unsubscribed, {@code add} will indicate this by
     * explicitly unsubscribing the new {@code Subscription} as well.
     * @param s
     *            the {@code Subscription} to add
    public final void add(Subscription s) {

    public final void unsubscribe() {

     * This method is invoked when the Subscriber and Observable have been connected but the Observable has
     * not yet begun to emit items or send notifications to the Subscriber. Override this method to add any
     * useful initialization to your subscription, for instance to initiate backpressure.
    public void onStart() {
        // do nothing by default


public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);

public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();



public class Observable<T> {
    final OnSubscribe<T> onSubscribe;
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));

    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);

    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        // validate and proceed
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");

        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception

        // new Subscriber so onStart it
        // 1.首先调用Subscriber.onStart()方法,上面我们知道该方法没有实现

         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
        // if not already wrapped
        // 把当前的subscriber转换成SafeSubscriber,SafeSubscriber是一个包装类,他对一些安全性做了校验,保证Subscriber中的onComplete和onError只会有一个被执行且执行一次
        // 而且一旦他们执行了后就不会再继续执行onNext方法
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);

        // The code below is exactly the same an unsafeSubscribe but not used because it would
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            // 这里就是核心了
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);

            // 其实上面的这两句代码完全可以简写为:
            // (observable.onSubscribe).call(subscriber);
            // return subscriber

        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                } catch (Throwable e2) {
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    // TODO why aren't we throwing the hook's return value.
                    throw r; // NOPMD
            return Subscriptions.unsubscribed();


RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);



public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
    // 这一块代码是不是似曾相识? 前面讲的RxJavaHooks。onCreate()方法是不是也是这样实现的? 只不过那里是onObservableCreate而这里是onObservableStart,这里可以简单的理解为返回参数onSubscribe
    Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
    if (f != null) {
        return f.call(instance, onSubscribe);
    return onSubscribe;

 * Hook to call before the Observable.subscribe() method is about to return a Subscription.
 * @param subscription the original subscription
 * @return the original or alternative subscription that will be returned
public static Subscription onObservableReturn(Subscription subscription) {
    Func1<Subscription, Subscription> f = onObservableReturn;
    if (f != null) {
        return f.call(subscription);
    return subscription;

static void init() {
    onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
        public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
            return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);

    onObservableReturn = new Func1<Subscription, Subscription>() {
        public Subscription call(Subscription f) {
            return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(f);


return subscriber

而这里observable.onSubscribe变量是谁呢? 它就是前面我们说的在Observable.onCreate(OnSubscribe<T> f)方法中将参数赋值给的那个变量,也就是OnSubscribe接口的实现类,而我们在调用onCreate()方法的时候都会实现call()方法。

 * Invoked when Observable.subscribe is called.
 * @param <T> the output value type
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
    // cover for generics insanity



// 创建被观察者、数据源
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello ");
        subscriber.onNext("World !");
Subscriber<String> subscriber = new Subscriber<String>() {
    public void onCompleted() {
        Log.i("@@@", "onCompleted");

    public void onError(Throwable e) {
        Log.i("@@@", "onError");

    public void onNext(String s) {
        Log.i("@@@", "onNext : " + s);


  • 首先我们调用Observable.create(OnSubscribe f)创建一个观察者,同时创建一个OnSubscribe接口的实现类作为create()方法的参数,并重写该接口的call()方法,在call()方法中我们调用subscriberonNext()onComplete()onError()方法。而这个call()方法中的subscriber对象就是我们后面调用observable.subscribe(subscriber);所传递的subscribe对象。
  • 然后调用new Subscriber()创建一个Subscriber类的实例。
  • 最后通过observable.subscribe(subscriber);ObservableSubscriber对象进行绑定。来订阅我们自己创建的观察者Subscriber对象。


更多精彩文章请见:Github AndroidNote,欢迎Star

