RxJava基础一-创建操作符

此篇内容均是来自书籍《RxJava响应式编程》李衍顺 著

一、什么是RxJava?

RxJava是一个非常著名的开源库,是ReactiveX(Reactive Extensions)的一种Java实现。ReactiveX是一种响应式扩展框架,有很多种实现,如RxAndroid, RxJS,RxSwift,RxRuby,RxCpp,RxGo等。目前RxJava有1.x和2.x两个主要分支,分别代表RxJava1和RxJava2。RxJava1发布的时间比较早, 使用也更广泛,所以先来了解下RxJava1。

RxJava可看作由Observable、Subscriber和Scheduler组成的。Subscriber订阅到Observable,Obervable会在默认或者指定的Scheduler上工作并产生数据流返回给Subscriber,Subscriber也会在默认或者指定的Scheduler上接收Observable发送过来的数据流。Scheduler是对线程的一种抽象,不同的Scheduler代表 了不同的线程。


image.png

1.1 Observable和Subscriber

Observable提供了subscribe方法, 当有Subscriber通过subscribe方法订阅到Observable时, Observable就可以向Subscriber发送数据流。响应式编程中的事件分为三类:普通事件 ,错误事件和结束事件,在Subscriber中有三个方法与这三件事一一对应,Observable会通过调用Subscriber的这三个方法来发送对应的事件。

  1. onNext:当Observable要发送普通事件时,就会调用这个方法。这个方法可被调用0~N次。
  2. onError: 当在Observable内部有异常或者错误产生时,就可以调用这个方法来向Subscriber发送错误事件。这个方法只能被调用一次。
  3. onComplete: 如果Observable已经发送完所有数据,并且没有发生错误,这时就需要调用这个方法来向Subscriber发送结束事件。这个方法只能调用一次,而且和onError是互斥的关系,也就是说调用了onError后就不能调用onComplete 反之亦然。在onError或者onComplete被调用之后, Observable就失去了作用, 不能再调用onNext来发送数据了。

Subscriber还提供了unsubscribe方法, 当Subscriber订阅到Observable之后, 可以随时调用这个方法来终止对Observable的订阅。

二、在Android工程中引入RxJava

只需要在Gradle配置文件中加入对RxJava的依赖。但是Android开发中一般会添加对RxAndroid的依赖,而RxAndroid已经依赖于RxJava,且一般难以及时更新到最新版的RxJava,所以如果想使用RxAndroid和最新版本的RxJava,则可以通过下面的配置来导入依赖。

api ('io.reactivex.rxjava2:rxandroid:2.1.0'){
        exclude module:'rxjava2'
    }
api 'io.reactivex.rxjava2:rxjava:2.2.1'

三、RxJava中的操作符

3.1 创建Observable的操作符

3.1.1 range

range操作符创建的Observable将会发送一个范围内的数据

Observable.range(0,4)
                .subscribe(new Action1<Integer>(){
                    @Override
                    public void call(Integer integer) {
                        System.out.println(integer);
                    }
                });

结果是:

0 1 2 3

3.1.2 defer和just

defer操作符,只有当Subscriber来订阅的时候才会创建一个新的Observable对象, 也就是说每次订阅都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的。
just操作符接收某个对象作为输入,然后会创建一个发送该对象的Observable。这个对象可以是一个数字,一个字符串,数组,Iterate对象等。just是一种非常快捷的创建Observable对象的方法,在后面的例子中会大量使用。
我们通过这两个操作符创建两个Observable,这两个Observable都会将Android系统中的当前时间作为数据发送。我们将这两个Observable分别保存在变量deferObservable和justObservable中。

Observable<Long> deferObservable = getDefer();
    Observable<Long> justObservable = getJust();
    private Observable<Long> getJust() {
        return Observable.just(System.currentTimeMillis());
    }
    private Observable<Long> getDefer() {
        return Observable.defer(new Func0<Observable<Long>>() {
            @Override
            public Observable<Long> call() {
                return getJust();
            }
        });
    }

现在分别对这两个Observable进行订阅

private void Just_Defer(){
        for(int i = 0; i<3;i++) {
            deferObservable.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    System.out.println("defer: " + aLong);
                }
            });
            justObservable.subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    System.out.println("just: " + aLong);
                }
            });

        }
    }

输出结果可以看到defer每次订阅都会得到Observable发送的一个全新的当前时间, 而just创建的操作符即使订阅多次也都会发出和首次订阅一样的数据。

defer: 1554532535942
just: 1554532535905
defer: 1554532535954
just: 1554532535905
defer: 1554532535954
just: 1554532535905

3.1.3 from

from操作符接收一个对象作为参数来创建Observable,这个参数对象可以是Iterable、Callable、Future和数组等。 from操作符创建的Observable将发送参数对象里的数据, 其创建方式类似于just操作符,但是just操作符创建的Observable会将整个参数对象作为数据一下子发送出去。 比如说参数对象是一个含有10个数字的数组,使用from创建的Observable就会发送10次, 每次发送一个数字,而使用just创建的Observable会一次就将整个数组发送出去。
首先创建一个数组和一个List, 分别存储0~5的整数,然后使用from操作符分别以数组和List作为输入参数创建两个Observable对象。之后分别订阅,会输出什么结果呢?

Integer[] arrays = {0,1,2,3,4,5};
    List<Integer> list = new ArrayList<>();
    private Observable<Integer> FromArray() {
        return Observable.from(arrays);
    }
    private Observable<Integer> FromIterable() {
        for(int i = 0;i <= 5; i++) {
            list.add(i);
        }
        return Observable.from(list);
    }
    private void From(){
        FromArray().subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println("FromArray: " + integer);
            }
        });
        FromIterable().subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println("FromIterable: " + integer);
            }
        });
    }
FromArray: 0
FromArray: 1
FromArray: 2
FromArray: 3
FromArray: 4
FromArray: 5
FromIterable: 0
FromIterable: 1
FromIterable: 2
FromIterable: 3
FromIterable: 4
FromIterable: 5

3.1.4 interval

interval所创建的Observable对象会从0开始, 每隔固定的时间发送一个数字。需要注意的是, 这个对象时运行在computation Scheduler中的,所以在Android开发中 ,如果需要在UI中显示结果, 则需要在主线程中订阅。
代码通过interval操作符创建了一个Observable, 创建的Observable将会以1秒为间隔不断地发送数据,因为我们需要更新UI,所以在主线程中进行订阅。这里的AndroidSchedulers.mainThread()属于RxAndroid库, RxAndroid是Jake Wharton在Android平台上开发的一个对RxJava的扩展。最后创建一个Subscriber对象对这个Observable对象进行订阅, 就会每秒输出一个从0开始递增的数据。

private void interval(){
        Subscriber<Long> subscriber = new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError: " + e.getMessage());
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("onNext:" + aLong);
            }
        };

        Observable.interval(0,10, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subscriber);
    }

最后结果是,原因不知道

onError: null

最后,由于interval操作符创建的Observable对象会不停地发送数据,所以当我们不再需要它的数据时要调用unSubscribe方法进行反订阅,反订阅后Observable将会停止发送数据

subscriber.unsubscribe();

3.1.5 repeat和timer

repeat操作符可以让Observable对象发送的数据重复发送N次, 我们可以指定其发送的次数。timer操作符创建的Observable会在指定时间后发送一个数字0, 注意其默认也是运行在computation scheduler 上。
下面我们来创建一个发送整数1,2,3的源Observable, 并使用repeat操作符在源Observable的基础上创建一个新的Observable,使其重复发送数据3次。另外使用timer创建一个会在1秒后发送数据的Observable。

private void Repeat_Timer() {
        Observable.just(1,2,3).repeat(3)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println("repeat: " + integer);
                    }
                });

        Observable.timer(1, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        System.out.println("timer: " + aLong);
                    }
                });
    }

结果呢,后面的这个报空指针异常,还没找到原因;正常timer操作符创建的Observable在订阅1秒后发送一个数据0.

repeat: 1
repeat: 2
repeat: 3
repeat: 1
repeat: 2
repeat: 3
repeat: 1
repeat: 2
repeat: 3

另外还有几个非常简单的创建操作符,如naver、empty、throw等,请移步查看官方文档。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容

  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    BrotherChen阅读 1,606评论 0 10
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    无求_95dd阅读 3,048评论 0 21
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    测天测地测空气阅读 630评论 0 1
  • 一、Retrofit详解 ·Retrofit的官网地址为 : http://square.github.io/re...
    余生_d630阅读 1,844评论 0 5
  • 转一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong阅读 911评论 0 2