RxAndroid简单分析

先来个RxAndroid的github地址

https://github.com/ReactiveX/RxAndroid


官方例子

Observable.just("one", "two", "three", "four", "five")
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())吧
    .subscribe(/* an Observer */);

- - - 
###简化例子
```java
Observer<String> observer = new Observer<String>() {
         @Override
         public void onSubscribe(Disposable d) {
             Log.i(LOG_TAG,"[onSubscribe]  " + Thread.currentThread().getId());
         }

         @Override
         public void onNext(String value) {
             Log.i(LOG_TAG,"[onNext]  "+value + Thread.currentThread().getId());
         }

         @Override
         public void onError(Throwable e) {
             Log.i(LOG_TAG,"[onError]  "+e);
         }

         @Override
         public void onComplete() {
             Log.i(LOG_TAG,"[onComplete]  "+Thread.currentThread().getId());
         }
     };
Observable.just("next -- >  1","next  -->  2")
    .subscribe(observer);
  • 接下来看看Observable.just()方法的实现
  public static <T> Observable<T> just(T item1, T item2) {
      ObjectHelper.requireNonNull(item1, "The first item is null");
      ObjectHelper.requireNonNull(item2, "The second item is null");
      /*
          ObjectHelper.requireNonNull(item1, "The first item is null");
          这个方法仅仅是判断item1是不是null 
          public static <T> T requireNonNull(T object, String message) {
              if (object == null) {
                  throw new NullPointerException(message);
               }
            return object;
          }
      */
      return fromArray(item1, item2);
  }
  • 可以看到just方法最后调用了 fromArray() 方法 接下来看看fromArray方法的实现
      public static <T> Observable<T> fromArray(T... items) {
          ObjectHelper.requireNonNull(items, "items is null");
          if (items.length == 0) {
              return return RxJavaPlugins.onAssembly((Observable<T>) ObservableEmpty.INSTANCE);
          } else
          if (items.length == 1) {
              return return RxJavaPlugins.onAssembly(new ObservableJust<T>(items[0]));
          }
          return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
       }
    
    • 可以看到fromArray方法就是调用了RxJavaPlugins.onAssembly这个方法,根据items的长度不同传递了不同的参数
    • 先分析items.length == 1的情况,这个情况下传入的实例是:new ObservableJust<T>(items[0])),对应上面的例子就是new ObservableJust<String>("one"));
    public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    
      private final T value;
      public ObservableJust(final T value) {
          this.value = value;
      }
    
      @Override
      protected void subscribeActual(Observer<? super T> s) {
          ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
          s.onSubscribe(sd);
          sd.run();
      }
    
      @Override
      public T call() {
          return value;
      }
    

}

  - 这里有些疑问。subscribeActual()这个方法是干什么的?以及里面的**ScalarDisposable**又是做什么的?先放一放后面再说

- 再来看看**RxJavaPlugins.onAssembly**这个方法
 ```java
public static <T> Observable<T> onAssembly(Observable<T> source) {
      Function<Observable, Observable> f = onObservableAssembly;
      if (f != null) {
          return apply(f, source);
      }
      return source;
  }
- 这里有一个非空的判断(以后再说这个),可以先理解成就直接吧source返回
  • 小结一哈just方法,首先是判断传入的item是不是空,如果不是空就调用了fromArray方法,在fromArray里面构造了一个Observable对象,然后直接返回。

  • just方法构造完了以后就调用了subscribe()方法并传入了一个Observer对象。看看suubscribe的核心代码(就两句话)
      public final void subscribe(Observer<? super T> observer) {
              observer = RxJavaPlugins.onSubscribe(this, observer);
              subscribeActual(observer);
      }
    
    • 先来看看RxJavaPlugins.onSubscribe(this, observer)这个究竟做了什么事~~
    public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) {
          BiFunction<Observable, Observer, Observer> f = onObservableSubscribe;
          if (f != null) {
              return apply(f, source, observer);
          }
          return observer;
      }
    
    • 这里面又有一个非空的判断,不管这个非空判断,也就是直接返回了传入的observer对象
    • 现在代码就比较清晰了,其实就是直接调用了subscribeActual(observer)这个方法
  • 小结一哈,Observable.just()根据参数的长度构造了一个特定的Observable对象并返回,然后调用了该对象的subscribeActual方法并传入observer
  • 接下来再来看前面留下的问题,fromArray方法里面有根据items的长度进行实例化不同的Observable
    • item长度为1的时候 --> ObservableJust
      • 它的subscribeActual()方法
      protected void subscribeActual(Observer<? super T> s) {
      ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
      s.onSubscribe(sd);
      sd.run();
      }
      - 这里又引入了一个新的类**ScalarDisposable**,来看看这个又是做什么的
      ```java
      public static final class ScalarDisposable<T>
      extends AtomicInteger //这个类是用来原子操作的类,java里面i++都不是线程安全的~~
      implements QueueDisposable<T>, Runnable {
      
        private static final long serialVersionUID = 3880992722410194083L;
      
        final Observer<? super T> observer;
      
        final T value;
      
        static final int START = 0;
        static final int FUSED = 1;
        static final int ON_NEXT = 2;
        static final int ON_COMPLETE = 3;
      
        public ScalarDisposable(Observer<? super T> observer, T value) {
            this.observer = observer;
            this.value = value;
        }
        //中间省略了一大堆方法~~
        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                //上面就是比较和赋值原子操作
                observer.onNext(value);//在这里可以看到调用了onNext()
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }
      }
      

    • 当items的长度大于1的时候 --> ObservableFromArray
      • 它的subscribeActual()方法
       public void subscribeActual(Observer<? super T> s) {
          FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
          s.onSubscribe(d);
          if (d.fusionMode) {
              return;
          }
          d.run();
      } 
      
      • 这里又出现了一个新的Disposable --> FromArrayDisposable,但是不管怎么样儿,最后都调用了d.run()方法

static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {

    final Observer<? super T> actual;

    final T[] array;

    int index;

    boolean fusionMode;

    volatile boolean disposed;

    FromArrayDisposable(Observer<? super T> actual, T[] array) {
        this.actual = actual;
        this.array = array;
    }
    void run() {
        T[] a = array;
        int n = a.length;

        for (int i = 0; i < n && !isDisposed(); i++) {//for循环调用撒~~
            T value = a[i];
            if (value == null) {
                actual.onError(new NullPointerException("The " + i + "th element is null"));
                return;
            }
            actual.onNext(value);//调用onNext方法
        }
        if (!isDisposed()) {
            actual.onComplete();
        }
    }
}
    ```

小结:

调用just的时候构造了一个Observable对象,并根据不同的参数实例化不同的Observable,不同的Observable有不同的subscribeActual()方法实现,subscribeActual方法里面都有一个Disposable对象,最后都调用了Disposable的run(该方法调用了onNext()方法)方法,最后在subscribe的时候实际上就是调用了Observable的subscribeActual方法。


线程切换分析

eg:

Observable.just("next -- >  1","next  -->  2")
              .subscribeOn(AndroidSchedulers.mainThread())
              .observeOn(Schedulers.newThread())
              .subscribe(observer);
  • subscribeOn
    public final Observable<T> subscribeOn(Scheduler scheduler) {
          ObjectHelper.requireNonNull(scheduler, "scheduler is null");
          return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
      }
    
    • 可以看到这里也是调用了RxJavaPlugins.onAsswmbly()方法,只是这里的参数变成了ObservableSubscribeOn的实例。
    • ObservableSubscribeOn
      public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
      final Scheduler scheduler;
    
      public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
          super(source);
          this.scheduler = scheduler;
      }
    
      @Override
      public void subscribeActual(final Observer<? super T> s) {
          final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
          s.onSubscribe(parent);
    
          parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
              @Override
              public void run() {
                  source.subscribe(parent);
              }
          }));
        }
        ....省略其他代码
    
    - 可以看到里面主要是调用了,Scheduler的schedulerDirect()方法,并在这个里面调用了,source.subscribe()
    
    • 这里我们就仅仅去看看Scheduler.newThread()的实现
     public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
    

//省略部分代码....
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
return sr;
}
```
- 这里就可以看出来实际上就是用的线程池来做的~~

  • observeOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
  • 这个也是调用了RxJavaPluginsonAssembly方法,传入的对象是ObservableObserveOn的实例。
    • ObservableObserveOn
     //仅仅提出了核心代码哈
     protected void subscribeActual(Observer<? super T> observer) {
       if (scheduler instanceof TrampolineScheduler) {
           source.subscribe(observer);
       } else {
           Scheduler.Worker w = scheduler.createWorker();
    
           source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
       }
    }
    
    • 这里对传入的Scheduler进行了判断,如果是TrampolineScheduler类型就直接调用了,Source的subscribe方法,这个Scource其实就是调用observeOn方法的Observable
    • 先来看看当Scheduler是newThreadScheduler的时候,可以看到实例化了一个ObserveOnObserver
      static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
    
      private static final long serialVersionUID = 6576896619930983584L;
      final Observer<? super T> actual;
      final Scheduler.Worker worker;
      final boolean delayError;
      final int bufferSize;
    
      SimpleQueue<T> queue;
    
      Disposable s;
    
      Throwable error;
      volatile boolean done;
    
      volatile boolean cancelled;
    
      int sourceMode;
    
      boolean outputFused;
    
      ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
          this.actual = actual;
          this.worker = worker;
          this.delayError = delayError;
          this.bufferSize = bufferSize;
      }
      //省略了很多代码....
      @Override
      public void onNext(T t) {
          if (done) {
              return;
          }
    
          if (sourceMode != QueueDisposable.ASYNC) {
              queue.offer(t);
          }
          schedule();
      }
        ```
      - 从源码可以看出来,**ObserveOnObserver**其实就是对Observer的一个包装
      - 在**onNext**方法中可以看到线程切换的代码
    
    

小结:

其实搞了半天就是一个线程池在里面切换,对对象的各种包装。subscribeOn就是对Observable的包装,切换了线程来调用source.subscribe()方法,而observeOn则是对Observer的包装,并重写了里面的回调方法,在回调的时候会自动切换线程。


AndroidSchedulers.mainThread()这个Scheduler的分析

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}
  • 可以看到最后都是实例化了HandlerScheduler,不同的是Looper的不同,
  • 再来看看HandlerScheduler的实现(仅仅贴出了主要的两个方法)

@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");

    run = RxJavaPlugins.onSchedule(run);
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
    return scheduled;
}

@Override
public Worker createWorker() {
    return new HandlerWorker(handler);
}

private static final class HandlerWorker extends Worker {
    private final Handler handler;

    private volatile boolean disposed;

    HandlerWorker(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        if (disposed) {
            return Disposables.disposed();
        }

        run = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.

        handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

        // Re-check disposed state for removing in case we were racing a call to dispose().
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }

        return scheduled;
    }
```
  - 最后都是用的handler.postDelayed方法来做的线程切换,so android上面的Schulder其实就是用了,Handler机制~~

RxAndroid使用不当会有内存泄漏的哦~~


Nothing is certain in this life. The only thing i know for sure is that. I love you and my life. That is the only thing i know. have a good day

:)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容

  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,464评论 7 62
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,608评论 18 399
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,646评论 18 139
  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 9,162评论 6 151
  • 除了梦中有你,翻看别处,再找不出你的半点消息,可我仍然知晓,你的存在,她人无可代替,但愿我们不在谋面,彼此...
    张暴少阅读 197评论 0 1