RxJava之timer和interval操作符源码解析

转载请以链接形式标明出处:
本文出自:103style的博客

timer 操作符

  • timer 操作符实际上返回的是一个 ObservableTimer对象。两个参数的方法默认在 Schedulers.computation()中工作。

     public static Observable<Long> timer(long delay, TimeUnit unit) {
         return timer(delay, unit, Schedulers.computation());
     }
     public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) {
         return RxJavaPlugins.onAssembly(new ObservableTimer(Math.max(delay, 0L), unit, scheduler));
     }
    
  • ObservableTimer 源码:

    • 构建了 TimerObserver 对象。
    • 执行 观察者 的 onSubscribe 方法。
    • 通过scheduler.scheduleDirect(ios, delay, unit) 返回一个 Disposable 对象。
    • 将返回的 Disposable 对象传给 TimerObserver 对象的 setResource 方法
    public final class ObservableTimer extends Observable<Long> {
        final Scheduler scheduler;
        final long delay;
        final TimeUnit unit;
        public ObservableTimer(long delay, TimeUnit unit, Scheduler scheduler) {
            this.delay = delay;
            this.unit = unit;
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(Observer<? super Long> observer) {
            TimerObserver ios = new TimerObserver(observer);
            observer.onSubscribe(ios);
            Disposable d = scheduler.scheduleDirect(ios, delay, unit);
            ios.setResource(d);
        }
        ...
    }
    
  • TimerObserver对象源码:

    static final class TimerObserver extends AtomicReference<Disposable>
    implements Disposable, Runnable {
    
        final Observer<? super Long> downstream;
    
        TimerObserver(Observer<? super Long> downstream) {
            this.downstream = downstream;
        }
        ...
        @Override
        public void run() {
            if (!isDisposed()) {
                downstream.onNext(0L);
                lazySet(EmptyDisposable.INSTANCE);
                downstream.onComplete();
            }
        }
    
        public void setResource(Disposable d) {
            DisposableHelper.trySet(this, d);
        }
    }
    
  • 首先看 TimerObserversetResource(Disposable d)方法 里的 DisposableHelper.trySet(this, d);

    public static boolean trySet(AtomicReference<Disposable> field, Disposable d) {
        if (!field.compareAndSet(null, d)) {
            if (field.get() == DISPOSED) {
                d.dispose();
            }
            return false;
        }
        return true;
    }
    
    • d 不为 null,直接 return true;否则判断 是否为 DISPOSED 状态,是的话调用传进来的 Disposable 对象(也就是之前 Scheduler 构建的 DisposeTask 对象)的 dispose 方法。
  • scheduler.scheduleDirect(ios, delay, unit) 方法:

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }
    
    • 首先创建了一个 Worker,因为默认是 Schedulers.computation()中工作,查看源码可知 实际调用的是 ComputationSchedulercreateWorker 方法 。
      Schedulers

      ...
      static final class ComputationHolder {
          static final Scheduler DEFAULT = new ComputationScheduler();
      }
      ...
      static {
          COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
          ...
      }
      
      static final class ComputationTask implements Callable<Scheduler> {
          @Override
          public Scheduler call() throws Exception {
              return ComputationHolder.DEFAULT;
          }
      }
      

      RxJavaPlugins

      public static Scheduler initComputationScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
          ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
          Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitComputationHandler;
          if (f == null) {
              return callRequireNonNull(defaultScheduler);
          }
          return applyRequireNonNull(f, defaultScheduler); // JIT will skip this
      }
      
      static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
          try {
              return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
          } catch (Throwable ex) {
              throw ExceptionHelper.wrapOrThrow(ex);
          }
      }
      

      f 默认为 null,所以返回的是 callRequireNonNull(defaultScheduler),然后实际调用的是 ComputationTaskcall 方法。返回的即为 ComputationScheduler 对象。

    • ComputationSchedulercreateWorker 方法 。

      public ComputationScheduler() {
          this(THREAD_FACTORY);
      }
      
      public ComputationScheduler(ThreadFactory threadFactory) {
          this.threadFactory = threadFactory;
          this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
          start();
      }
      
      public Worker createWorker() {
          return new EventLoopWorker(pool.get().getEventLoop());
      }
      
      • pool.get()通过构造函数我们可知返回的为 NONE = new FixedSchedulerPool(0, THREAD_FACTORY); 所以 pool.get().getEventLoop() 返回的为 SHUTDOWN_WORKER = new PoolWorker(new RxThreadFactory("RxComputationShutdown"));。实际上是创建了一个 executorExecutors.newScheduledThreadPool(1, factory) ,即 factoryRxThreadFactory("RxComputationShutdown")单线程线程池对象PoolWorker对象
        FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
            this.cores = maxThreads;
            this.eventLoops = new PoolWorker[maxThreads];
            for (int i = 0; i < maxThreads; i++) {
                this.eventLoops[i] = new PoolWorker(threadFactory);
            }
        }
        public PoolWorker getEventLoop() {
            int c = cores;
            if (c == 0) {
                return SHUTDOWN_WORKER;
            }
            return eventLoops[(int)(n++ % c)];
        }
        
      • 所以createWorker 返回的是:poolWorkerfactoryRxThreadFactory("RxComputationShutdown")单线程线程池对象PoolWorker对象
        static final class EventLoopWorker extends Scheduler.Worker {
            private final ListCompositeDisposable serial;
            private final CompositeDisposable timed;
            private final ListCompositeDisposable both;
            private final PoolWorker poolWorker;
        
            volatile boolean disposed;
        
            EventLoopWorker(PoolWorker poolWorker) {
                this.poolWorker = poolWorker;
                this.serial = new ListCompositeDisposable();
                this.timed = new CompositeDisposable();
                this.both = new ListCompositeDisposable();
                this.both.add(serial);
                this.both.add(timed);
            }
        ...
        
    • decoratedRun 即为 TimerObserver 对象。

      public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
          final Worker w = createWorker();
          final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
          DisposeTask task = new DisposeTask(decoratedRun, w);
          w.schedule(task, delay, unit);
          return task;
      }
      
    • 然后构建了一个 DisposeTask 对象。

      static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
          final Runnable decoratedRun;
          final Worker w;
          Thread runner;
      
          DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
              this.decoratedRun = decoratedRun;
              this.w = w;
          }
      
          @Override
          public void run() {
              runner = Thread.currentThread();
              try {
                  decoratedRun.run();
              } finally {
                  dispose();
                  runner = null;
              }
          }
          ...
      }
      
    • createWorker 返回的 poolWorkerfactoryRxThreadFactory("RxComputationShutdown")单线程线程池对象PoolWorker对象,并执行 schedule 方法。
      实际上是执行了 单线程线程池对象 Executors.newScheduledThreadPool(1, factory)schedule(task, delayTime, unit)方法,并将返回值 Future 对象 传给ScheduledRunnablesetFuture 方法。

      public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
          if (disposed) {
              return EmptyDisposable.INSTANCE;
          }
          return scheduleActual(action, delayTime, unit, null);
      }
      
      public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
          Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
          ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
          ...
          Future<?> f;
          try {
              if (delayTime <= 0) {
                  f = executor.submit((Callable<Object>)sr);
              } else {
                  f = executor.schedule((Callable<Object>)sr, delayTime, unit);
              }
              sr.setFuture(f);
          } catch (RejectedExecutionException ex) {
              ...
              RxJavaPlugins.onError(ex);
          }
          return sr;
      }
      
  • 线程池的schedule(task, delayTime, unit) 方法实际时延时 delayTime 执行 taskrun 方法。即为 执行 TimerObserver 对象的 run 方法。

    public void subscribeActual(Observer<? super Long> observer) {
        TimerObserver ios = new TimerObserver(observer);
        observer.onSubscribe(ios);
        Disposable d = scheduler.scheduleDirect(ios, delay, unit);
        ios.setResource(d);
    }
    
  • TimerObserver 对象的 run 方法: 即执行了 观察者onNext(0L)onComplete()

    public void run() {
        if (!isDisposed()) {
            downstream.onNext(0L);
            lazySet(EmptyDisposable.INSTANCE);
            downstream.onComplete();
        }
    }
    

interval 系列操作符

  • interval系列 包含 intervalintervalRange两个操作符,包含以下 6 个方法:

    • interval(long period, TimeUnit unit)
    • interval(long initialDelay, long period, TimeUnit unit)
    • interval(long period, TimeUnit unit, Scheduler scheduler)
    • interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
    • intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
    • intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

    分别返回的是 ObservableIntervalObservableIntervalRange 对象,默认的 SchedulerSchedulers.computation()

public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) {
    return interval(initialDelay, period, unit, Schedulers.computation());
}
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
    return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) {
    return intervalRange(start, count, initialDelay, period, unit, Schedulers.computation());
}
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
    return RxJavaPlugins.onAssembly(new ObservableIntervalRange(start, end, Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}
  • ObservableInterval 源码:

    • 构建了 IntervalObserver 对象。
    • 因为默认Schedulers.computation() 所以 sch instanceof TrampolineScheduler不成立,除非我们手动传参 SchedulerSchedulers.trampoline()
    • 和前面的 ObservableTimer类似, 即为调用 ObservableIntervalrun 方法。只是返回的为PeriodicDirectTask对象。
    • setResourceObservableTimer类似,就不再赘述了。
    public final class ObservableInterval extends Observable<Long> {
        final Scheduler scheduler;
        final long initialDelay;
        final long period;
        final TimeUnit unit;
    
        public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
            this.initialDelay = initialDelay;
            this.period = period;
            this.unit = unit;
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(Observer<? super Long> observer) {
            IntervalObserver is = new IntervalObserver(observer);
            observer.onSubscribe(is);
    
            Scheduler sch = scheduler;
            if (sch instanceof TrampolineScheduler) {
                Worker worker = sch.createWorker();
                is.setResource(worker);
                worker.schedulePeriodically(is, initialDelay, period, unit);
            } else {
                Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
                is.setResource(d);
            }
        }
        ...
    }
    
  • sch.schedulePeriodicallyDirect(is, initialDelay, period, unit) 实际调用的为 schedulePeriodically方法:

    • interval 的间隔时间转化为 Nanoseconds
    • 然后设置 第一次的 响应时间为 当前时间+ 间隔时间 的 纳秒数。
    • 里面又将 PeriodicDirectTask对象 包装成 PeriodicTask 对象。
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
        Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }
        return periodicTask;
    }
    
    public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
        final SequentialDisposable first = new SequentialDisposable();
        final SequentialDisposable sd = new SequentialDisposable(first);
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        final long periodInNanoseconds = unit.toNanos(period);
        final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
        final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
    
        Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
                periodInNanoseconds), initialDelay, unit);
        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }
        first.replace(d);
        return sd;
    }
    
  • PeriodicTask 对象的 run 方法

    • decoratedRun.run(); 又调用了 PeriodicDirectTask对象的 run 方法.
    • run 方法的最后 sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS)); 这里又 重复执行 这个任务,直到 IntervalObserver对象 isDisposed()true
     final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection {
        final Runnable decoratedRun;
        final SequentialDisposable sd;
        final long periodInNanoseconds;
        long count;
        long lastNowNanoseconds;
        long startInNanoseconds;
    
        PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
                long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
            this.decoratedRun = decoratedRun;
            this.sd = sd;
            this.periodInNanoseconds = periodInNanoseconds;
            lastNowNanoseconds = firstNowNanoseconds;
            startInNanoseconds = firstStartInNanoseconds;
        }
    
        @Override
        public void run() {
            decoratedRun.run();
            if (!sd.isDisposed()) {
                long nextTick;
                long nowNanoseconds = now(TimeUnit.NANOSECONDS);
                // If the clock moved in a direction quite a bit, rebase the repetition period
                if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
                        || nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
                    nextTick = nowNanoseconds + periodInNanoseconds;
                    /*
                     * Shift the start point back by the drift as if the whole thing
                     * started count periods ago.
                     */
                    startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
                } else {
                    nextTick = startInNanoseconds + (++count * periodInNanoseconds);
                }
                lastNowNanoseconds = nowNanoseconds;
    
                long delay = nextTick - nowNanoseconds;
                sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
            }
        }
        ...
    }
    
  • PeriodicDirectTaskrun 方法:

    • 实际调用的即为IntervalObserverrun()
    static final class PeriodicDirectTask
    implements Disposable, Runnable, SchedulerRunnableIntrospection {
        final Runnable run;
        final Worker worker;
        volatile boolean disposed;
    
        PeriodicDirectTask(@NonNull Runnable run, @NonNull Worker worker) {
            this.run = run;
            this.worker = worker;
        }
    
        @Override
        public void run() {
            if (!disposed) {
                try {
                    run.run();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    worker.dispose();
                    throw ExceptionHelper.wrapOrThrow(ex);
                }
            }
        }
        ...
    }
    
  • IntervalObserverrun()

    • 调用 观察者onNext 方法
    static final class IntervalObserver
    extends AtomicReference<Disposable>
    implements Disposable, Runnable {
        final Observer<? super Long> downstream;
        long count;
    
        IntervalObserver(Observer<? super Long> downstream) {
            this.downstream = downstream;
        }
    
        @Override
        public void run() {
            if (get() != DisposableHelper.DISPOSED) {
                downstream.onNext(count++);
            }
        }
    }
    
  • 然后直到我们直接调用 dispose() 方法结束流程。

以上

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,042评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 89,996评论 2 384
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,674评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,340评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,404评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,749评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,902评论 3 405
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,662评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,110评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,451评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,577评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,258评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,848评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,726评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,952评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,271评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,452评论 2 348

推荐阅读更多精彩内容

  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    BrotherChen阅读 1,601评论 0 10
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    无求_95dd阅读 2,991评论 0 21
  • RxJava 博大精深,想要入门和进阶,操作符是一个切入点。 所以,我们希望寻找一种可以把操作符写得比较爽,同时可...
    geniusmart阅读 6,338评论 3 32
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    测天测地测空气阅读 626评论 0 1
  • 一、Retrofit详解 ·Retrofit的官网地址为 : http://square.github.io/re...
    余生_d630阅读 1,828评论 0 5