手把手教你使用 RxJava 2.0(二)

本篇文章主要介绍线程调度器,通过对线程调度器的了解,方便我们更好的处理异步操作,在合适的场景选择合适的线程。同时,结合上篇文章,我们就初步掌握了 RxJava 2.x的基本操作并可以应用在我们的项目中。在本篇文章的后半部分,会具体展示RxJava 2.x的使用。

Scheduler简介

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。
在RxJava 中,Scheduler,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景。

Scheduler 的 API

● Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

●Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

●Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

**●Schedulers.computation(): **计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

● Android 还有一个专用的** AndroidSchedulers.mainThread()**,它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。subscribeOn(): 指定Observable(被观察者)所在的线程,或者叫做事件产生的线程。 * observeOn(): 指定 Observer(观察者)所运行在的线程,或者叫做事件消费的线程。
下面用代码展示下线程调度的使用:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d("所在的线程:",Thread.currentThread().getName());
                Log.d("发送的数据:", 1+"");
                e.onNext(1);
            }
        }).subscribeOn(Schedulers.io()) 
                .observeOn(AndroidSchedulers.mainThread()) /
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("所在的线程:",Thread.currentThread().getName());
                        Log.d("接收到的数据:", "integer:" + integer);
                    }
                });

01-19 10:06:38.275 27734-27783/? D/所在的线程:: RxCachedThreadScheduler-1
01-19 10:06:38.275 27734-27783/? D/发送的数据:: 1
01-19 10:06:38.285 27734-27734/? D/所在的线程:: main
01-19 10:06:38.285 27734-27734/? D/接收到的数据:: integer:1

可以看到,Observable(被观察者)发送事件的线程的确改变了, 是在一个叫 RxCachedThreadScheduler-1的线程中发送的事件, 而Observer(观察者)仍然在主线程中接收事件。由此我们实现了线程调度的操作,可以在此基础上尽情的进行异步操作。

下面来介绍一个具体的使用场景。

RxJava 2.x 网络请求使用

Android中有多种网络请求库, Retrofit便是其中的佼佼者,它的优势之一便是它支持RxJava的方式来调用。我们便以Retrofit进行网络请求,RxJava进行异步处理,两者结合来讲解RxJava在网络请求中的具体使用。

本例中 我们使用聚合数据中的全国天气数据,获得城市信息。
接口url:http://v.juhe.cn/weather/citys?key=.... 其中key是你申请时聚合数据给你的密钥。
具体请求的返回数据形式如下:

下面以上述数据简单讲解一下Retrofit的基本用法。

要使用Retrofit,先在Gradle中添加配置:

//Retrofit
    compile 'com.squareup.retrofit2:retrofit:2.1.0'
//Gson converter
    compile 'com.squareup.retrofit2:converter-gson:2.1.0'
//Okhttp
    compile 'com.squareup.okhttp3:okhttp:3.5.0'
//RxJava adapter
    compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'

定义Api接口:

public interface Api {
    @GET("citys")
    Observable<AllCity> getAllCity(@Query("key") String key);
}

创建一个Retrofit客户端:

private static Retrofit create() {
        OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
        builder.readTimeout(10, TimeUnit.SECONDS);
        builder.connectTimeout(9, TimeUnit.SECONDS);

        return new Retrofit.Builder().baseUrl(baseUrl)
                .client(builder.build())
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();
    }

接下来就可以进行网络请求:

Retrofit retrofit = create();
        Api api = retrofit.create(Api.class);
        Observable<AllCity> observable = api.getAllCity(appkey);
        observable.subscribeOn(Schedulers.io())
                .flatMap(new Function<AllCity, ObservableSource<City>>() {
                    @Override
                    public ObservableSource<City> apply(AllCity city) throws Exception {
                        ArrayList<City> result = city.getResult();
                        return Observable.fromIterable(result);
                    }
                })
                .filter(new Predicate<City>() {
                    @Override
                    public boolean test(City city) throws Exception {
                        String id = city.getId();
                        if(Integer.parseInt(id)<5){
                            return true;
                        }
                        return false;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<City>() {
                    @Override
                    public void accept(City city) throws Exception {
                        System.out.println(city);
                    }
                });

01-19 13:28:56.952 13218-13218/com.lvr.rxjavalearning I/System.out: City{id='1', province='北京', city='北京', district='北京'}
01-19 13:28:56.952 13218-13218/com.lvr.rxjavalearning I/System.out: City{id='2', province='北京', city='北京', district='海淀'}
01-19 13:28:56.952 13218-13218/com.lvr.rxjavalearning I/System.out: City{id='3', province='北京', city='北京', district='朝阳'}
01-19 13:28:56.952 13218-13218/com.lvr.rxjavalearning I/System.out: City{id='4', province='北京', city='北京', district='顺义'}

调用Api接口方法,返回一个Observable(被观察者)对象,然后当subscribe()订阅后,就可以在IO线程中执行网络 请求操作,然后进行转换过滤,最终Observer(观察者)对象在UI线程中获得城市id在1-4之间的城市信息。
其中请求返回的数据是json形式,AllCity类包含所有的返回数据,具体代码如下:

public class AllCity {
    private String error_code;
    private String reason;
    private String resultcode;
   private ArrayList<City> result;
//省略getter,setter方法
}

ArrayList集合中封装了所有城市的信息,City类包含城市详细信息,具体代码如下:


public class City {

   /**
    * id : 1
    * province : 北京
    * city : 北京
    * district : 北京
    */

   private String id;
   private String province;
   private String city;
   private String district;
   //省略getter,setter,toString方法
}

本例中,我们假设Observer(观察者)需要id号在1-4之间的城市信息,我们就可以先使用flatMap()操作符先将封装所有信息的AllCity中提取出城市信息集合,然后转换成一个新的Observable(被观察者)进行传递,然后使用filter()进行过滤,过滤出符合要求的城市信息,最终传递给Observer(观察者),让其在UI线程接收数据,然后更新UI。整个过程完成了网络请求,同时进行异步操作,防止阻塞UI线程。
以上仅仅以实例介绍RxJava的基础使用,RxJava的功能远不止于此。不过掌握了以上的技能,我们已经可以在我们的项目中应用RxJava进行异步操作了。关于一些RxJava中的细节及其他相关技术还需要慢慢积累。

下面我们另一个重要的概念Disposable。当Observer(观察者)与Observable(被观察者)通过subscribe()建立连接后,事件可以进行传递。当发生一些其他情况,不得不断开两者之间的连接时,该怎么操作?这个时候就该Disposable上场了。

Disposable简介及使用

Disposable简介

Disposable, 这个单词的字面意思是一次性用品,用完即可丢弃的。在RxJava中,用它来切断Observer(观察者)与Observable(被观察者)之间的连接,当调用它的dispose()方法时, 它就会将Observer(观察者)与Observable(被观察者)之间的连接切断, 从而导致Observer(观察者)收不到事件。
下面我们就该考虑如何来获得Disposable对象?
Disposable的作用是切断连接,确切地讲是将Observer(观察者)切断,不再接收来自被观察者的事件,而被观察者的事件却仍在继续执行。
因此Disposable的对象通过观察者获得,具体分为两种方式。

Disposable对象的获得

1.Observer接口

Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

通过创建Observer接口,当订阅后,建立与Observable的联系,onSubscribe(Disposable d)中便可以获得Disposable对象。
2.Consumer等其他函数式接口

Disposable disposable = Observable.just("你好").subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {

            }
        });

当subscribe()后直接返回一个Disposable 对象
获得了Disposable对象后,我们便可以调用dispose()方法,在恰当的时机,断开连接,停止接收Observable(被观察者)发送的事件。

注意:当切断被观察者与观察者之间的联系,Observable(被观察者)的事件却仍在继续执行。

另外,补充一下onNext()、onComplete()和onError()事件的发送规则。
具体规则:

Observable(被观察者)可以发送无限个onNext, Observer(观察者)也可以接收无限个onNext.

当Observable(被观察者)发送了一个onComplete后, Observable(被观察者)中onComplete之后的事件将会继续发送, 而Observer(观察者)收到onComplete事件之后将不再继续接收事件.

当Observable(被观察者)发送了一个onError后, Observable(被观察者)中onError之后的事件将继续发送, 而Observer(观察者)收到onError事件之后将不再继续接收事件.

Observable(被观察者)可以不发送onComplete或onError.

最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然

注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, 并不一定会导致程序崩溃. 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃。

以上就是本篇文章的全部内容,结合上一篇文章,已经可以灵活使用RxJava了。在下篇文章中,将会介绍RxJava中新增加的内容:Flowable及backpressure。

手把手教你使用 RxJava 2.0(一)
手把手教你使用 RxJava 2.0(三)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容