在《使用 UT 高效地玩转 RxJava 的操作符》一文中,笔者介绍了一种学习 RxJava 操作符的方式,除了文中提到的操作符之外,还有几个细节较多,弹珠图不能�完全诠释操作符含义的,在这篇文章里继续来讲解。
defer
defer
是创建型的操作符,字面上有「推迟」的意思,推迟创建数据流的规则是:一开始不会马上创建 Observable
,直到有订阅者订阅时才会创建,且每次都创建全新的 Observable
。
跟上一篇文章一样,自顶向下来看这张弹珠图:
- 操作符:这个长框内有很多数据流,要表达的含义是:每次都创建全新的数据流
Observable
。 - 输入:图中产生了两条全新的数据流,且发送的数据可能不一样(弹珠颜色不一样)
- 输出:创建型的操作符基本上都没有输出的图示,根据对操作符的大概理解,为了验证输入,需要订阅两次。
- 实现思路:
defer
在每次产生Observable
时,都保存起来,最终验证这些数据流不会相等。代码如下:
@Test
public void defer1() {
List<Observable<Integer>> list = new ArrayList<>();
Observable<Integer> deferObservable = Observable.defer(() -> {
Observable<Integer> observable = Observable.just(1, 2, 3);
list.add(observable);
return observable;
});
// 两次订阅,每次都将产生全新的Observable
deferObservable.subscribe();
deferObservable.subscribe();
assertNotSame(list.get(0), list.get(1));
}
写完这个测试用例后,仍然觉得不过瘾,虽然验证了每次都创建全新的数据流 Observable
,但是操作符本身所代表的「推迟」的能力尚未体现,我们需要更多的资料来了解这个能力。
官方文章告诉我们可以查阅这篇文章:Deferring Observable code until subscription in RxJava ,国内也有相关的译文。仔细阅读发完这篇文章后,笔者用 UT 来表达文中的一些观点,这个测试用例的思路有以下两点:
- 按照这样的流程来实现:使用
defer
创建数据流->订阅一次->改变数据流的数据->再订阅一次,由于defer
可以推迟创建数据流,第二次订阅时创建的数据流与第一次是不一样的,因此订阅到数据也将不一样。 - 使用一个普通的创建型操作符,如
just
,按照第1点的方式,对比和defer
的区别。完整的代码实现如下:
@Test
public void defer2() {
class Person {
public String name = "nobody";
public Observable<String> getJustObservable() {
//创建的时候便获取name值
return Observable.just(name);
}
public Observable<String> getDeferObservable() {
//订阅的时候才获取name值
return Observable.defer(this::getJustObservable);
}
}
Person person = new Person();
Observable<String> justObservable = person.getJustObservable();
Observable<String> deferObservable = person.getDeferObservable();
// 数据改变之前
justObservable.subscribe(mList::add);
assertEquals(mList, Collections.singletonList("nobody"));
mList.clear();
deferObservable.subscribe(mList::add);
assertEquals(mList, Collections.singletonList("nobody"));
person.name = "geniusmart";
// 数据改变之后
mList.clear();
justObservable.subscribe(mList::add);
assertEquals(mList, Collections.singletonList("nobody"));
mList.clear();
deferObservable.subscribe(mList::add);
assertEquals(mList, Collections.singletonList("geniusmart"));
}
通过这个例子我所要表达的意思是:弹珠图本身包含了很多细节,有些细节并没办法完整诠释,此时我们可以通过阅读更多的文章,通过 UT 的形式来验证观点,深入学习每一个操作符。
retry
retry
和 retryWhen
是错误处理型的操作符,当数据流发送了错误的数据时,将根据既定的规则发起重新订阅。
有了之前的铺垫,实现这张弹珠图并不复杂:数据流第一次发送了一个 Error
数据,retry
执行,订阅者重新发起订阅,数据流第二次发送正常的数据。具体代码实现如下:
@Test
public void retry() {
final Integer[] arrays = {0};
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3 / arrays[0]++);
subscriber.onCompleted();
}
})
.retry()
.subscribe(mList::add);
assertEquals(mList, Arrays.asList(1, 2, 1, 2, 3));
}
retryWhen
retry
只是小试牛刀,接下来看看 retryWhen
。
这张图很难理解,既有错误重试,还有延时策略,实在无从下手,我们需要查阅更多的文章,幸运是刚刚 defer
篇的那位作者写了相关的另外一篇文章 RxJava's repeatWhen and retryWhen, explained,也有相应的译文 。仔细阅读之后,梳理下 retryWhen
的套路,当错误重试需要延时策略时,实现流程大概是这样子的:
理清楚这个流程后,实现起来就比较轻松了,代码如下:
@Test
public void retryWhen_flatMap_timer() {
Observable.create(subscriber -> {
System.out.println("subscribing");
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onError(new RuntimeException("RuntimeException"));
})
.retryWhen(observable ->
observable.flatMap(
(Func1<Throwable, Observable<?>>) throwable ->
//延迟5s重新订阅
Observable.timer(5, TimeUnit.SECONDS, mTestScheduler)
)
)
.subscribe(num -> {
System.out.println(num);
mList.add(num);
});
//时间提前10s,将发生1次订阅+2次重新订阅
mTestScheduler.advanceTimeBy(10, TimeUnit.SECONDS);
assertEquals(mList, Arrays.asList(1, 2, 1, 2, 1, 2));
}
除此之外,文中还介绍了其他一些经验之谈,如不能破坏数据流,如何实现限制次数的延时错误重试等,这里分别用 UT 来实现。
破坏数据流
如果 retryWhen
的输入 Observable<Throwable>
,被粗暴的直接返回一个普通的数据流,则链式结构将被打断,如下代码:
@Test
public void retryWhen_break_sequence() {
// 错误的做法:破坏数据流,打断链式结构
Observable.just(1, 2, 3)
.retryWhen(throwableObservable -> Observable.just(1, 1, 1))
.subscribe(mList::add);
//数据流被打断,订阅不到数据
assertTrue(mList.isEmpty());
// 正确的做法:至少将throwableObservable作为返回结果,此时的retryWhen()等价于retry()
Observable.just(1, 2, 3)
.retryWhen(throwableObservable -> throwableObservable).
subscribe(mList::add);
//此处的数据流不会触发error,因此正常输出1,2,3的数列
assertEquals(mList, Arrays.asList(1, 2, 3));
}
限制次数的延时错误重试
- 当数据流产生错误的数据时,会触发
retryWhen
,并输入Observable<Throwable> error
。 - 将
Observable<Throwable> error
与Observable.range(1, 3)
做zip
聚合,range
作为创建型的操作符,将产生1,2,3
的数据流,因此前3次 error 将会正常配对并调用onCompleted()
,不再接收第四次的 error。
具体的代码实现如下:
@Test
public void retryWhen_zip_range_timer() {
Observable.create((Subscriber<? super Integer> subscriber) -> {
System.out.println("subscribing");
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onError(new RuntimeException("always fails"));
})
.retryWhen(observable ->
observable.zipWith(
Observable.range(1, 3),
(Func2<Throwable, Integer, Integer>) (throwable, num) -> num
)
.flatMap((Func1<Integer, Observable<?>>) num -> {
System.out.println("delay retry by " + num + " second(s)");
return Observable.timer(num, TimeUnit.SECONDS);
}))
.doOnNext(System.out::println)
.doOnCompleted(() -> System.out.println("completed"))
.toBlocking()
.forEach(mList::add);
//正常订阅一次,重新订阅3次
assertEquals(mList, Arrays.asList(1, 2, 1, 2, 1, 2, 1, 2));
}
总结
使用 UT 来实现弹珠图(marble diagrams),过瘾而且高效,研究操作符时事半功倍,对于一些弹珠图无法完整诠释的,可以多查阅一些文章,并将文中的观点用 UT 来实现。总而言之,这是一种很好的学习方式,强烈推荐大家使用。
参考文章
http://blog.danlew.net/2015/07/23/deferring-observable-code-until-subscription-in-rxjava/
http://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/