RxAndroid入门

响应式编程

RxJava提供了响应式编码规范,而RxAndroid是专供Android平台的RxJava(只是针对平台增加了少量类),一般Android开发者口中的RxJava指的便是RxAndroid。关于响应式编程的概念这里不多说,网上一大堆,我就简单说明下响应式编程与传统编程的区别。如果逻辑B依赖于逻辑A,逻辑C依赖于逻辑B,那么:

  1. 传统编程中的作法是在A执行完成后去执行B,B执行完毕后执行C,除了A、B、C本身的逻辑外,这部分由依赖产生的先后执行逻辑也是由开发者编写。
  2. 响应式编程中,会先将A、B、C做个依赖绑定,A->B->C,就像水流一样,从A流到B,再从B流到C,依赖产生的先后执行逻辑由语言或者库提供支持,开发者只需要编写A、B、C本身的逻辑以及告诉提供方三者间的关系即可。换句话说,就是B会自动响应A的执行结果,C会自动响应B的执行结果。

响应式编程可以很优雅地处理任务间(或者业务间)的依赖关系,而这关系就像流一样,并且多用于异步场景,且由于大家熟知的响应式编程库RxJava的主要应用场景也是异步,主要应用手段是流,因此不少文章简单粗暴地将响应式编程等同是异步+流,这个是比较片面的。

示例

我们来看个具体的例子,加深下理解。本例中需要处理三个任务,分别为A、B、C,这三个任务都需要在工作线程中执行,A执行完以后延迟1S执行B,B执行完以后延迟1S执行C。

以下为公共代码,定义了A、B、C三个任务以及定时器:

public class TaskA implements Runnable {
    @Override
    public void run() {
        // do something
    }
}

public class TaskB implements Runnable {
    @Override
    public void run() {
        // do something
    }
}

public class TaskC implements Runnable {
    @Override
    public void run() {
        // do something
    }
}

public class Timer {
    private static final Timer INSTANCE = new Timer();
    private Handler mHandler;

    private Timer() {
        HandlerThread thread = new HandlerThread("timer");
        thread.start();
        mHandler = new Handler(thread.getLooper());
    }

    public static Timer get() {
        return INSTANCE;
    }

    public void post(Runnable runnable) {
        mHandler.post(runnable);
    }

    public void postDelayed(Runnable runnable, long delay) {
        mHandler.postDelayed(runnable, delay);
    }
}

传统编程

Timer.get().post(new Runnable() {                          
       @Override                                            
       public void run() {                                  
           new TaskA().run();                               
           Timer.get().postDelayed(new Runnable() {         
               @Override                                    
               public void run() {                          
                   new TaskB().run();                       
                   Timer.get().postDelayed(new Runnable() { 
                       @Override                            
                       public void run() {                  
                           new TaskC().run();               
                       }                                    
                   }, 1000);                                
               }                                            
           }, 1000);                                        
       }                                                    
   });                                  

传统编程方式的问题非常突出,嵌入层次太多,随着依赖关系的增多以及复杂化,这样的代码会变得极其臃肿且不易阅读和维护。

响应式编程

首先我们需要编写一个支持响应式编程规范的库,代码如下(这个类很多情况没有考虑到,仅仅是为了演示用):

public class StreamTimer implements Runnable {
    private List<Task> mTasks = new LinkedList<>();

    public StreamTimer() {

    }

    public StreamTimer next(Runnable runnable) {
        return next(runnable, 0);
    }

    public StreamTimer next(Runnable runnable, long delay) {
        Task task = new Task(runnable, delay);
        mTasks.add(task);
        return this;
    }

    public void startup() {
        startNextTimer();
    }

    private void startNextTimer() {
        if(mTasks.isEmpty()) {
            return;
        }
        Task task = mTasks.get(0);
        Timer.get().postDelayed(this, task.delay);
    }

    @Override
    public void run() {
        Task task = mTasks.remove(0);
        task.runnable.run();
        startNextTimer();
    }

    private class Task {
        Runnable runnable;
        long delay;

        Task(Runnable runnable, long delay) {
            this.runnable = runnable;
            this.delay = delay;
        }
    }
}

执行A、B、C任务的代码如下:

new StreamTimer()           
   .next(new TaskA())       
   .next(new TaskB(), 1000) 
   .next(new TaskC(), 1000) 
   .startup();              

next表示新增一个任务且需要在上一次任务执行完毕后才执行。以上代码非常简洁且可读性很强,任务间的依赖关系非常清晰,拓展也非常简单,新增任务时只需要在合适的地方插入next方法即可。

比起上述例子,RxJava的功能要强大得多,也复杂得多,上述例子只是为了让新手能快速掌握响应式编程及流式调用(也称为链式调用),接下去我们开始讲解RxJava。

RxAndroid基本使用

使用RxAndroid需要在build.gradle中加入如下依赖:

compile 'io.reactivex.rxjava2:rxandroid:2.1.1'
compile 'io.reactivex.rxjava2:rxjava:2.2.7'

RxAndroid也直接去https://github.com/ReactiveX/RxAndroid下载源码,里面就不到10个类。

Observable和Observer

  1. Observable:被观察者,用来处理事件的派发。
  2. Observer:观察者,观察目标为Observable,Observable派发出来的事件将被它处理,一个Observable可以有多个Observer。当Observable有变化时,Observer能够立即响应这些变化。

熟悉观察者模式的同学应该对这两者有非常深刻的认识了,它们是RxJava中最基础的东西,RxJava中其他的对象、方法、操作符都是围绕这二者进行或拓展的。来看下最简单的例子:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("A");
        emitter.onNext("B");
        emitter.onNext("C");
        emitter.onComplete();
    }
});
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        log("onSubscribe");
    }
    @Override
    public void onNext(String s) {
        log("onNext:" + s);
    }
    @Override
    public void onError(Throwable e) {
        error("onError", e);
    }
    @Override
    public void onComplete() {
        log("onComplete");
    }
};
observable.subscribe(observer);

日志输出如下:

onSubscribe
onNext:A
onNext:B
onNext:C
onComplete


来看下上述例子中Observable相关的对象和方法:

  1. 使用Observable.create创建Observable对象。
  2. ObservableEmitter为发射器,Observable使用它发射事件给所有Observer
  3. 使用Observable.subscribe添加一个Observer


接着来看下Observer的几个方法:

  1. onSubscribe:在订阅observable时回调,可以在这里调用Disposable.dispose取消订阅或者将Disposable对象保存起来以便在后续某个时刻取消订阅。
  2. onNext:在ObservableEmitter.onNext执行后回调,onNext表示的是整个响应链中的一环,在这里处理响应链中的其中一个任务,可以多次调用。
  3. onComplete:在ObservableEmitter.onComplete执行后回调,表示任务已全部完成,可以在这里做收尾工作。
  4. onError:在ObservableEmitter.onError执行后或者链中任一环节出现异常时回调,表示任务执行失败。

除了onSubscribe,其它几个方法有如下特点:

  1. ObservableEmitter中的同名方法一一对应,在ObservableEmitter的同名方法执行后回调。
  2. onCompleteonError互斥,两者只能触发其中之一,且触发后onNext便不会再触发。
  3. onComplete触发后,后续ObservableEmitter调用任何方法都不会再生效。
  4. onError触发后,如果ObservableEmitter再调用onError或者onComplete,RxJava会抛出异常,开发者需要自行保证唯一性。(处理方式为什么不同onComplete?)

需要特别注意的,ObservableOnSubscribe.subscribe方法在每次有新的Observer加入时,都会在Observer.onSubscribe回调之后触发,这就保证了所有的Observer都能接收到事件。

subscribe

在上面提到了通过subscribeObservableObserver建立了绑定关系,我们来看下方法原型:

void subscribe(Observer<? super T> observer)

除此之外,subscribe还有很多重载方法,我们来看下所有的方法原型:

Disposable subscribe();
Disposable subscribe(Consumer<? super T> onNext);
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete);
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    subscribe(ls);
    return ls;
}

以上方法最终都会调用到参数最多的那个方法,而该方法内部又调用了subscribe(Observer<? super T> observer),这些方法相对来说更为简洁易用。可以发现新增了两个类,分别是ConsumerAction,其中ObserveronNextonErroronSubscribe方法被Consumer.accept方法取代,onComplete方法被Action.run方法取代。因此,如果我们仅仅只关心Observer的其中一个或多个回调,那么便可以通过ConsumerAction来代替Observer注册到Observable中。如上述的例子中,如果我们只关心onNext,那么可以这么使用:

Observable<String> observable = Observable.create(new ObservableOnSubscribe
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("A");
        emitter.onNext("B");
        emitter.onNext("C");
        emitter.onComplete();
    }
});
observable.subscribe(
        new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                log("onNext:" + s);
            }
            }, Functions.<Throwable>emptyConsumer(),
        new Action() {
            @Override
            public void run() throws Exception {
                log("onComplete");
            }
        }
);

使用lambda表达式之后的代码如下:

Observable<String> observable = Observable.create(
        (ObservableEmitter<String> emitter) -> {
            emitter.onNext("A");
            emitter.onNext("B");
            emitter.onNext("C");
            emitter.onComplete();
        }
    );
observable.subscribe(
        (String s) -> log("onNext:" + s),
        Functions.<Throwable>emptyConsumer(),
        () -> log("onComplete")
);

代码显得简洁优雅易读得多,实际上RxJava是为响应式编程和函数式编程而生,因此使用lambda表达式才能完美的使用RxJava。

Android使用lambda表达式需要使用jdk1.8,且gradle版本需要4.0以上(也可能是3.x某个版本,记不住了,4.0以上准没错),在build.gradle中加入如下代码:

android {
    compileOptions {
        sourceCompatibility > JavaVersion.VERSION_1_8
        targetCompatibility > JavaVersion.VERSION_1_8
    }
}

线程调度

以下称Observer.onXXX回调时所在线程为Observer工作线程,Observable发射事件时所在线程为Observable工作线程。默认情况下,ObserverObservable工作线程是同一个,该线程即调用Observable.subscribe时所在的线程。然而,异步调用才是RxJava的核心应用场景,下面我们来看下如何改变ObserverObservable的工作线程。使用Observable.subscribeOn配置Observable工作线程,使用Observable.observeOn配置Observer工作线程。很多情况下,置Observable工作线程位于子线程中,因为可能存在网络请求、数据存取等耗时操作;而Observer工作线程位于主线程,因为接收到事件后需要刷新UI。下面来看下这种场景下的应用示例:

Observable<String> observable = Observable.create(
        (ObservableEmitter<String> emitter) -> {
            log("emit thread=" + Thread.currentThread().getName());
            emitter.onNext("A");
            emitter.onNext("B");
            emitter.onNext("C");
            emitter.onComplete();
        }
);
observable.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(
                (String s) -> log("onNext:" + s + " thread=" + Thread.currentThread().getName()),
                Functions.<Throwable>emptyConsumer(),
                () -> log("onComplete thread=" + Thread.currentThread().getName())
        );

日志输出如下:

emit thread=RxNewThreadScheduler-2
onNext:A thread=main
onNext:B thread=main
onNext:C thread=main
onComplete thread=main

可以看到线程调度确实如我们所期望的了。需要特别强调的subscribeOnobserveOn返回的不是原本的Observable对象,因此如果没有采用链式调用,在调用这两个方法之后必须重新赋值给Observable对象,如:

// 错误调用
observable.subscribeOn(xxx)
    .observeOn(xxx);
observable.subscribe(xxx);

// 正确调用
observable = observable.subscribeOn(xxx)
    .observeOn(xxx);
observable.subscribe(xxx);

如果重复调用subscribeOnobserveOn会怎么样呢,我们来看段代码:

Observable<String> observable = Observable.create(
        (ObservableEmitter<String> emitter) -> {
            log("emit thread=" + Thread.currentThread().getName());
            emitter.onNext("A");
        }
);
observable = observable.subscribeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.single())
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.single());
observable.subscribe((String s) -> log("onNext1:" + s + " thread=" + Thread.currentThread().getName()));

日志输出如下:

emit thread=main
onNext1:A thread=RxSingleScheduler-1

可以得出如下临时结论:

  1. subscribeOn以第一次调用为准。
  2. observeOn以最后一次调用为准。

我们再来看下更复杂的例子:

Observable<String> observable = Observable.create(
        (ObservableEmitter<String> emitter) -> {
            log("emit thread=" + Thread.currentThread().getName());
            emitter.onNext("A");
        }
);
observable = observable.subscribeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.single())
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.single());
observable.subscribe((String s) -> log("onNext1:" + s + " thread=" + Thread.currentThread().getName()));
observable = observable.subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io()) ;
observable.subscribe((String s) -> log("onNext2:" + s + " thread=" + Thread.currentThread().getName()));

日志输出如下:

emit thread=main
emit thread=main
onNext1:A thread=RxSingleScheduler-1
onNext2:A thread=RxCachedThreadScheduler-2

现在,我们可以得出最终结论了:

  1. subscribeOn以第一次调用为准。
  2. observeOn以调用subscribe前的最后一次调用为准,每个subscribe单独计算。

由此可知,我们可以让不同的Observer在不同线程中调度。

RxJava使用Scheduler来表示线程调度,上面提到的Schedulers.newThread()AndroidSchedulers.mainThread()都是由RxJava提供的Scheduler实现类。一般我们不需要手动去实现Scheduler,而是通过Schedulers或者AndroidSchedulers(Android专用)获取。下面分别来看下二者所提供的创建能力。

Schedulers

newThread

大致等同于new Thread(runnable).start();,线程数没有上限,除了测试场景一般不会用到它。

io

用于I/O操作场景,线程数没有上限。与newThread比较相似,区别在于该调度器的内部使用了一个无数量上限的线程池,可以复用空闲的线程,因此效率更高。

computation

用于计算场景,计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,因此不要把I/O操作放在这里。该类型的Scheduler使用固定数量的线程池,数量为处理器核数。除了阻塞(包括I/O操作、wait等)外,其他操作都可以使用该调度器,不过通常用处理事件循环,大数据运算等。

single

单线程调度,所有任务都需要排队依次运行。

trampoline

任务在当前线程运行。

from(Executor executor)

使用指定的线程池调度。

AndroidSchedulers

mainThread

任务在主线程上运行。

from(Looper looper)

任务在指定Looper上调度。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,454评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,553评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,921评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,648评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,770评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,950评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,090评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,817评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,275评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,592评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,724评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,409评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,052评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,815评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,043评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,503评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,627评论 2 350

推荐阅读更多精彩内容

  • 转一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong阅读 910评论 0 2
  • 转载自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657阅读 2,016评论 1 9
  • 作者寄语 很久之前就想写一个专题,专写Android开发框架,专题的名字叫 XXX 从入门到放弃 ,沉淀了这么久,...
    戴定康阅读 7,619评论 13 85
  • 《小王子》是一本童话故事,却是一本写给成人的童话故事,如果你还感到彷徨,如果你感到孤独,不妨读一读小王子的故事。 ...
    陈艳_3645阅读 631评论 0 0
  • 今天看了一篇文章,一个90后的女孩,青春洋溢活力四射。她说:只要够车费和住宿钱就可以去旅行!不用管那么多,因为可以...
    深小彤阅读 285评论 0 2