Rxjava 创造型操作符

关于操作符的介绍,官网说明的还是非常清楚的,还配有事件流向图。



上面就是事件上游(被观察者)

  • 箭头表示 Observable 发射消息的时间线;
  • 花花绿绿,形形状状的就是发射的数据;
  • 最后有竖线,就是结束的意思,对应着时间的onComplete,× 对应着 onError

下面是事件下游(观察者)

  • 时间线是观察者接收消息的时间线
  • 花花绿绿,形形状状的就是接收的数据;

中间那部分就是功能强大操作符了
在这里面连接了上游和下游,使他们建立了订阅关系。

最基础的就是创造型操作符,顾名思义,它们的作用就是创建 Observable,并发送事件

基本创建

create

创建一个被观察者对象,使用者自己使用发射器的 onNext(), onError(), 和 onCompleted() 发送对应消息,下游可以接收。

有意思的是,这里就没有上游线,因为创造型操作符本身就是创建一个上游,并允许发送消息。

为了示例看起来更简洁,先写一个创建观察者的方法

private Observer createObserver() {
    return new Observer<Integer>() {
        // 下游 Observer 观察者 处理事件
        @Override
        public void onSubscribe(Disposable d) {
        }
        @Override
        public void onNext(Integer integer) {
            log("下游处理事件 onNext " + integer);
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onComplete() {
            log("下游处理事件 onComplete");
        }
    };
}
 /** Create 基本創建
  *  ObservableEmitter是事件的发送器,可以发送多个onNext()方法;一旦发送 onComplete(),onError() 事件之后,后续的事件将不会再发送
 */
public void rx_create() {
    // 上游 Observable 被观察者
    Observable.create(new ObservableOnSubscribe<Integer>() {
        // 发射器 发射事件
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            log("上游发射事件");
            // 发射事件
            emitter.onNext(1);
            emitter.onNext(2);
            log("上游发射完成");
        }
    }).subscribe(createObserver());
}

// 结果
上游发射事件
下游处理事件 onNext 1
下游处理事件 onNext 2
上游发射完成

快速创建

create 是使用自己的发射器发送的,RxJava 中也提供了更加快速的创建方式

Just

内部会自己发射数据,发射之后会再发一个 onComplete, 这里面支持发送多个数据
// just 会依次发射,最多发送 10 个
public void rx_just() {
    // 上游 Observable 被观察者
    // 内部会先发射 1, 再发射 2
    Observable.just(1, 2).subscribe(createObserver());
}
// 结果
下游处理事件 onNext 1
下游处理事件 onNext 2
下游处理事件 onComplete

from: fromArray/fromIterable

内部自己发射的,数集对象/迭代器
public void rx_formArray() {
    // 上游 Observable 被观察者
    Integer[] array = {1, 2, 3, 4, 5};
    Observable.fromArray(array).subscribe(createObserver());
}

// 结果
下游处理事件 onNext 1
下游处理事件 onNext 2
下游处理事件 onNext 3
下游处理事件 onNext 4
下游处理事件 onNext 5
下游处理事件 onComplete

range

发射一个范围内的有序数列,可以指定范围的起始和长度
public void rx_range() {
    // 从 1 开始加 数量 5个 (1,2,3,4,5)
    Observable.range(1,5).subscribe(createObserver());
}

// 结果
下游处理事件 onNext 1
下游处理事件 onNext 2
下游处理事件 onNext 3
下游处理事件 onNext 4
下游处理事件 onNext 5
下游处理事件 onComplete

empty / never / error

下游默认是 Object,无法发出有值的事件,创建后只会发射 onComplete 创建后,什么都不做 创建后会发射一个 Error 时间,通知异常
/**
 * 上游没有发射任何事件 无法指定类型,默认 Object Rxjava 泛型默认类型 == Object
 * 做一个耗时操作,不需要任何数据来刷新 UI 只会调用 onComplete
 */
public void rx_empty() {
    // 不会发射有值的事件
    Observable.empty().subscribe(new Observer<Object>() {
        @Override
        public void onSubscribe(Disposable d) {
        }
        @Override
        public void onNext(Object o) {
            // 没有事件可接收
            log("empty -- > onNext");
        }
        @Override
        public void onError(Throwable e) {
            // 如果是 .error 会在这接收
        }
        @Override
        public void onComplete() {
            // 内部一定会调用 onComplete 事件
            log("empty --- > onComplete");
        }
    });
}
// 结果
empty --- > onComplete

延迟创建

除了以上创建方法,我们可以为事件的发送添加延迟,支持延迟创建的有下几个操作符。

timer

快速创建后,延迟指定时间后,发送1个数值0(Long类型)

interval

按照固定时间发射一个无限递增的整数序列。发送的事件序列 = 从0开始、无限递增1的的整数序列,也可以为它添加第一发射数据前的延时时间
public void rx_interval() {
    log("------ Start ------");
    Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    log("下游处理事件 onNext " + aLong);
                }
            });
}

结果:每个一秒钟会发射一个数据,从 0 开始 加1 递增

TIPS
这里用到了个 新的类 Consumer 其实也是个观察者类,可以看做是 Observer 的简化版,里面只有一个方法 accpet 相当于 Observer 中的 onNext

intervalRange

每隔指定时间就发送事件,可指定发送数据的数量,与 interval 类似,但可以指定发送数据的数量。它们都支持设置第一次延时时间

public void rx_intervalRange() {
    log("------ Start ------");
    // start 开始累积, count 累积多少个数量, initialDelay 开始等待的时间, period 每隔多久执行, TimeUnit 时间单位
    Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    log("accept: " + aLong);
                }
            });
}

结果,数据从 1 开始 发送 5 个就是 1,2, 3, 4, 5,最后单位是 秒,开始延迟1s, 每个 2s 发送一次数据。

defer

直到有观察者(Observer)订阅时,才动态创建被观察者对象及发送事件

int i = 1;
public void rx_defer() {
    // 上游 被观察者
    Observable observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
        @Override
        public ObservableSource<Integer> call() throws Exception {
            return Observable.just(i);
        }
    });
    // 再赋值
    i = 10;
    // 订阅
    // 此时,才会调用 defer()创建被观察者对象
    observable.subscribe(createObserver());
}

// 结果
下游处理事件 onNext 10
下游处理事件 onComplete

defer 操作符的入参是一个 Callable 的接口实现,里面的 call 方法返回的是一个 ObservableSource,看过之前手写框架就就已经知道,其实就是返回一个被观察者。通过之前的学习和经验应该能推断出,只有在订阅的时候,才会使用 call 方法,创建一个被观察者对象,然后再用这个对象订阅。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352