此篇内容均是来自书籍《RxJava响应式编程》李衍顺 著
3.5 错误处理操作符
RxJava对错误的处理很方便,当有错误的时候就会调用Subscriber的onError方法将错误分发出去,由Subscriber自己来处理错误。这种处理错误的方式虽然方便,但也有缺点,就是每个Subscriber都要去定义如何处理错误,如果有100个Subscriber,就要定义100遍。这节来学习如何用错误处理操作符来集中处理错误。
3.5.1 onErrorReturn
onErrorReturn 操作符可在发生错误时,让Observable发送一个预先定义好的数据并停止继续发送数据。
/**
* onErrorReturn
*/
private Observable<String> createObserver() {
return Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
for(int i= 1; i<=6; i++){
if(i<3){
subscriber.onNext("onNext:" + i);
} else {
subscriber.onError(new Throwable("Throw error"));
}
}
}
});
}
private Observable<String> onErrorReturnObserver(){
return createObserver().onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable throwable) {
return "onErrorReturn";
}
});
}
private void onErrorReturnTest(){
onErrorReturnObserver().subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("onErrorReturn-onCompleted");
}
@Override
public void onError(Throwable e) {
log("onErrorReturn-onError:" + e.getMessage());
}
@Override
public void onNext(String s) {
log("onErrorReturn-onNext:" + s);
}
});
}
输出结果:
在发送出两个数据之后, onErrorReturn捕捉到了抛出的异常并返回了指定的字符串给订阅者,然后整个Observable发送数据的过程结束。
onErrorReturn-onNext:onNext:1
onErrorReturn-onNext:onNext:2
onErrorReturn-onNext:onErrorReturn
onErrorReturn-onCompleted
3.5.2 onErrorResumeNext
onErrorReturn操作符在有错误事件的时候回让Observable停止发送数据;而onErrorResumeNext在有错误发送的时候,会创建另外一个Observable来代替当前的Observable并继续发送数据,就好像错误并没有发生一样。
private Observable<String> createObserver() {
return Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
for(int i= 1; i<=6; i++){
if(i<3){
subscriber.onNext("onNext:" + i);
} else {
subscriber.onError(new Throwable("Throw error"));
}
}
}
});
}
private Observable<String> onErrorReturnObserver(){
return createObserver().onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable throwable) {
return "onErrorReturn";
}
});
}
private void onErrorReturnTest(){
onErrorReturnObserver().subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("onErrorReturn-onCompleted");
}
@Override
public void onError(Throwable e) {
log("onErrorReturn-onError:" + e.getMessage());
}
@Override
public void onNext(String s) {
log("onErrorReturn-onNext:" + s);
}
});
}
订阅输出结果如下,在正常发送两个数据后产生错误,新创建的Observable继续发送7、8、9三个数字,然后整个数据发送过程结束
onErrorResumeNext-onNext:onNext:1
onErrorResumeNext-onNext:onNext:2
onErrorResumeNext-onNext:7
onErrorResumeNext-onNext:8
onErrorResumeNext-onNext:9
onErrorResumeNext-onCompleted
3.5.3 onExceptionResumeNext
onExceptionResumeNext操作符类似于onErrorResumeNext,不同之处在于其会对onError抛出的数据类型做判断--如果是Exception, 就会使用另一个Observable代替原Observable继续发送数据,否则将错误分发给Subscriber。
/**
* onExceptionResumeNext
*/
private Observable<String> createObserver(final Boolean createException) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for(int i=1; i<=6; i++){
if(i<3){
subscriber.onNext("onNext:" + i);
} else if(createException){
subscriber.onError(new Exception("Exception"));
} else {
subscriber.onError(new Throwable("Throw error"));
}
}
}
});
}
private Observable<String> onExceptionResumeObserver(boolean isException) {
return createObserver(isException).onExceptionResumeNext(Observable.just("7","8","9"));
}
private void onExceptionResumeNextTest(){
onExceptionResumeObserver(true).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("onException-true-onCompleted");
}
@Override
public void onError(Throwable e) {
log("onException-true-onError:" + e.getMessage());
}
@Override
public void onNext(String s) {
log("onException-true-onNext:" + s);
}
});
onExceptionResumeObserver(false).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
log("onException-false-onCompleted");
}
@Override
public void onError(Throwable e) {
log("onException-false-onError:" + e.getMessage());
}
@Override
public void onNext(String s) {
log("onException-false-onNext:" + s);
}
});
}
输出的结果如下,当错误的类型是Exception时, 在发送完1和2之后,onExceptionResumeNext会捕获到错误事件并且用新的Observable来继续发送数据7,8,9,最后整个过程正常结束;而当错误类型不是Exception时,onExceptionResumeNext将不会起任何作用,错误事件被直接分发给订阅者,整个过程非正常结束
onException-true-onNext:onNext:1
onException-true-onNext:onNext:2
onException-true-onNext:7
onException-true-onNext:8
onException-true-onNext:9
onException-true-onCompleted
onException-false-onNext:onNext:1
onException-false-onNext:onNext:2
onException-false-onError:Throw error
3.5.4 retry
retry操作符在发送错误时会重新进行订阅, 而且可以重复多次,所以发送的数据可能会产生重复。但是有可能每次retry都会发送错误,从而造成不断订阅不断retry的死循环,这种情况下可以指定最大重复次数。如果retry达到了最大重复次数还有错误的话,就将错误返回给观察者。
/**
* retry
*/
private Observable<Integer> createRetryObserver(){
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
log("subscribe");
for(int i=0; i<3; i++){
if(i==2){
subscriber.onError(new Exception("Exception"));
} else {
subscriber.onNext(i);
}
}
}
});
}
private void retryTest(){
createRetryObserver().retry(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
log("retry-onCompleted");
}
@Override
public void onError(Throwable e) {
log("retry-onError:"+ e.getMessage());
}
@Override
public void onNext(Integer integer) {
log("retry-onNext:" + integer);
}
});
}
输出结果如下,Observable每次发送两个数据后会发送错误事件,从而导致重新订阅,在重新订阅了两次后, 错误事件最终被分发给了订阅者。
subscribe
retry-onNext:0
retry-onNext:1
subscribe
retry-onNext:0
retry-onNext:1
subscribe
retry-onNext:0
retry-onNext:1
retry-onError:Exception