RxJava系列文章目录导读:
一、RxJava create操作符的用法和源码分析
二、RxJava map操作符用法详解
三、RxJava flatMap操作符用法详解
四、RxJava concatMap操作符用法详解
五、RxJava onErrorResumeNext操作符实现app与服务器间token机制
六、RxJava retryWhen操作符实现错误重试机制
七、RxJava 使用debounce操作符优化app搜索功能
八、RxJava concat操作处理多数据源
九、RxJava zip操作符在Android中的实际使用场景
十、RxJava switchIfEmpty操作符实现Android检查本地缓存逻辑判断
十一、RxJava defer操作符实现代码支持链式调用
十二、combineLatest操作符的高级使用
十三、RxJava导致Fragment Activity内存泄漏问题
十四、interval、takeWhile操作符实现获取验证码功能
一、需求场景:
在开发App的时候, 很多公司的提api接口, 请求的的时候都需要带有token, 该token在用户第一次启动app或者登陆的时候去获取. 以后的所有请求都需要带该Token 如果token过期, 服务器将返回401, 这时候就需要去请求获取token的接口, 如果获取成功接着在请求原来的接口. 这个时候就两个回调的嵌套了. 实现起来比较费劲, 而且也不够优雅. 代码的可维护性变得很差. 可以使用 onErrorResumeNext
来处理这样的业务逻辑.
例如:请求一个用户信息接口,如果token没有过期,返回用户信息,如果token过期,服务器返回401,客户端发一个获取新token的请求,成功后,再去请求用户信息接口。
二、如何使用onErrorResumeNext解决
使用Retrofit来访问服务器
private static RestAdapter restAdapter = new RestAdapter
.Builder()
.setLogLevel(RestAdapter.LogLevel.FULL)
.setEndpoint(BASE_URL)
.setErrorHandler(new NetWorkErrorHandler())
.setRequestInterceptor(requestInterceptor)
.build();
public static <S> S createService(Class<S> serviceClazz) {
return restAdapter.create(serviceClazz);
}
如果服务器返回401,我们要去请求新的token,下面来判断错误类型:
NetWorkErrorHandler
private static class NetWorkErrorHandler implements ErrorHandler {
@Override
public Throwable handleError(RetrofitError error) {
retrofit.client.Response r = error.getResponse();
if (r != null && r.getStatus() == 401) {
Log.e("ErrorHandler", "---------> access deny code=401");
// User Custom Exception
return new AccessDenyException(error.getMessage());
}
return error.getCause();
}
}
UserApi
public interface UserApi {
@GET("/token")
AuthToken refreshToken();
}
服务器端代码逻辑
服务器端使用Java web+Tomcat来实现的. 如果需要可以把服务器部署在你的本地机器上, github地址
服务器端的基本逻辑:客户端请服务器api,服务器判断客户端带过来的token,如果过期则返回401,提示没有权限访问;如果是请求token接口,则返回token,有效期为10s。
客户端App的实现
以一个请求用户信息接口为例
Observable<Response> observable = userApi.getUserInfo();
observable.onErrorResumeNext(refreshTokenAndRetry(observable))//also use retryWhen to implement it
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Response>() {
@Override
public void onCompleted() {
loading = false;
appendText(tvLogs, "task completed-----");
//hideLoadingDialog();
}
@Override
public void onError(Throwable t) {
//hideLoadingDialog();
t.printStackTrace();
loading = false;
appendText(tvLogs, t.getClass().getName() + "\n" + t.getMessage());
NetErrorType.ErrorType error = NetErrorType.getErrorType(t);
appendText(tvLogs, error.msg);
}
public void onNext(Response response) {
String content = new String(((TypedByteArray) response.getBody()).getBytes());
appendText(tvLogs, "receiver data: " + content);
}
});
核心代码
private <T> Func1<Throwable, ? extends Observable<? extends T>> refreshTokenAndRetry(final Observable<T> toBeResumed) {
return new Func1<Throwable, Observable<? extends T>>() {
@Override
public Observable<? extends T> call(Throwable throwable) {
throwable.printStackTrace();
// Here check if the error thrown really is a 401
if (isHttp401Error(throwable)) {
return createTokenObvervable().flatMap(new Func1<AuthToken, Observable<? extends T>>() {
@Override
public Observable<? extends T> call(AuthToken token) {
appendText(tvLogs, "refresh token success,token's validity is 10s\nResume last request");
return toBeResumed;
}
});
}
// re-throw this error because it's not recoverable from here
return Observable.error(throwable);
}
public boolean isHttp401Error(Throwable throwable) {
return throwable instanceof AccessDenyException;
}
};
}
请求token api 的Observable
public Observable<AuthToken> createTokenObvervable() {
return Observable.create(new Observable.OnSubscribe<AuthToken>() {
@Override
public void call(Subscriber<? super AuthToken> observer) {
try {
if (!observer.isUnsubscribed()) {
appendText(tvLogs, "God!!! Token is out of date. \nstart refresh token......");
observer.onNext(userApi.refreshToken());
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
运行效果:
本文的例子放在github上https://github.com/chiclaim/android-sample/tree/master/rxjava