RxJava和Retrofit配合使用过程中的具体流程是怎么样的呢?
1.如何创建一个请求
2.具体的请求线程是什么
3.各种线程之间的回调是怎样的
1.如何使用Retrofit创建一个 请求
首先我们需要定义一个接口 格式如下:
public interface Api {
@GET("api/data/福利/{pageCount}/{pageIndex}")
Call<BaseModel<ArrayList<Benefit>>> defaultBenefits(
@Path("pageCount") int pageCount,
@Path("pageIndex") int pageIndex
);
@GET("api/data/福利/{pageCount}/{pageIndex}")
Observable<BaseModel<ArrayList<Benefit>>> rxBenefits(
@Path("pageCount") int pageCount,
@Path("pageIndex") int pageIndex
);
}
返回值类型为Call的是纯Retrofit格式的调用,
而下面返回值类型为Observable的就是RxJava + Retrofit的格式。我们今天就通过第二种方式来介绍
一般来说,我们使用Retrofit进行网络 请求要先创建一个Retrofit对象,而这个对象的创建是通过Builder模式来创建的 如下格式:
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://gank.io/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
那么我们就来看看这个Retrofit对象的创建都做了些什么事情?
Builder是Retrofit的内部类 直接看代码:
public Builder() {
this(Platform.get());
}
继续调用内部带参数的构造函数
Builder(Platform platform) {
this.platform = platform;
// Add the built-in converter factory first. This prevents overriding its behavior but also
// ensures correct behavior when using converters that consume all types.
converterFactories.add(new BuiltInConverters());
}
只是对成员platform以及converterFactroises进行初始化
这里platform的实际类型是
static class Android extends Platform {
@Override public Executor defaultCallbackExecutor() {
return new MainThreadExecutor();
}
@Override CallAdapter.Factory defaultCallAdapterFactory(Executor callbackExecutor) {
return new ExecutorCallAdapterFactory(callbackExecutor);
}
static class MainThreadExecutor implements Executor {
private final Handler handler = new Handler(Looper.getMainLooper());
@Override public void execute(Runnable r) {
handler.post(r);
}
}
}
而第二个成员converterFactories只是增加了一个成员而已 至于他有什么用,我们暂且不说,因为我分析完源码也没发现这个成员变量的作用,到时候我们用到时候再进行分析。
此时Builder已经构建完毕,接下来就是对该对象所需要的各种参数进行添加
baseUrl("http://gank.io/")
这句代码就是对HttpUrl进行赋值,不过这里面需要注意的点是:baseUrl后边那个/不能省略 不然拼接URL地址的时候就会出现错误,具体的解释 请看这个方法的注释 ,列举了各种写法以及结果,如果在这里出现问题,请对号入座。
然后是
addConverterFactory(GsonConverterFactory.create())
这里面主要是添加了一个转换工厂,因为目前我们使用的服务器返回的数据格式都是使用JSON格式,所以这里添加了一个Gson转换工厂
重点看这里
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
这里就是使用RxJava的适配器来封装CallAdapter,具体是如何使用的 我们后边遇到了再说
最后调用build方法创建一个Retrofit对象 ,然后返回。
我们俩看看build方法中做了些什么事情
public Retrofit build() {
//对url进行非空检查
if (baseUrl == null) {
throw new IllegalStateException("Base URL required.");
}
/ /因为初始化时候,并未对callFactory赋值 此时为null
okhttp3.Call.Factory callFactory = this.callFactory;
if (callFactory == null) {
进入这里,将callFactory赋值为OkHttpClient
callFactory = new OkHttpClient();
}
Executor callbackExecutor = this.callbackExecutor;
if (callbackExecutor == null) {
//platform我们在上面已经知道是Android平台 最后返回的实际类型为MainThreadExecutor
callbackExecutor = platform.defaultCallbackExecutor();
/*
@Override public Executor defaultCallbackExecutor() {
return new MainThreadExecutor();
}
*/
}
// Make a defensive copy of the adapters and add the default Call adapter.
List<CallAdapter.Factory> adapterFactories = new ArrayList<>(this.adapterFactories);
adapterFactories.add(platform.defaultCallAdapterFactory(callbackExecutor));
/*
@Override CallAdapter.Factory defaultCallAdapterFactory(Executor callbackExecutor) {
return new ExecutorCallAdapterFactory(callbackExecutor);
}
在这里adapterFactories里面含有俩个变量
ExecutorCallAdapterFactory 和 RxJavaCallAdapterFactory
*/
// Make a defensive copy of the converters.
List<Converter.Factory> converterFactories = new ArrayList<>(this.converterFactories);
此时该集合也有两个值
GsonConverterFactory 和 BuiltInConverters(基本上没什么用好像)
return new Retrofit(callFactory, baseUrl, converterFactories, adapterFactories,
callbackExecutor, validateEagerly);
}
}
注释已经写的很清楚,就不多说了。最后通过已经初始化的成员变量对Retrofit的成员变量进行初始化。
此时赋值之后如下:
Retrofit(okhttp3.Call.Factory callFactory, HttpUrl baseUrl,
List<Converter.Factory> converterFactories, List<CallAdapter.Factory> adapterFactories,
Executor callbackExecutor, boolean validateEagerly) {
this.callFactory = callFactory;//OkHttpClient
this.baseUrl = baseUrl;//http://gank.io/
this.converterFactories = unmodifiableList(converterFactories); // Defensive copy at call site.
this.adapterFactories = unmodifiableList(adapterFactories); // Defensive copy at call site.
this.callbackExecutor = callbackExecutor;
//class retrofit2.Platform$Android$MainThreadExecutor
this.validateEagerly = validateEagerly;//false
}
接下来就是重要的一步 ,就是使用我们刚才创建的Retrofit对象来构建一个服务(就是我么定义的接口)的实现类
这里使用动态代理的方式 构建一个实现类并且返回 ,
当我们使用返回的对象进行函数调用得时候 比如通过api对象调用函数rxBenefits
Api api = retrofit.create(Api.class);api.rxBenefits(20, page++)
接下来我们具体分析:
public <T> T create(final Class<T> service) {
Utils.validateServiceInterface(service);
if (validateEagerly) {
eagerlyValidateMethods(service);
}
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
new InvocationHandler() {
private final Platform platform = Platform.get();
@Override public Object invoke(Object proxy, Method method, Object... args)
throws Throwable {
//此时method就是我们接口中定义的 参数就是传递进来的两个参数
//public abstract rx.Observable com.stay4it.request.Api.rxBenefits(int,int)
// If the method is a method from Object then defer to normal invocation.
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
ServiceMethod serviceMethod = loadServiceMethod(method);
OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);
}
});
}
这里面其实重要的有两步,第一步就是ServiceMethod的构建
然后使用构建的ServiceMethod方法的adapt适配一个Observable对象返回。接下来我们就捡重要的说:
loadServiceMethod(method);这句代码实现了什么功能?
1.从缓存中取出以method为key的ServiceMethod
如果没有 创建并且放进缓存对象
2.创建ServiceMethod的过程
public Builder(Retrofit retrofit, Method method) {
this.retrofit = retrofit;
this.method = method;
//method上只有一个注解GET注解
this.methodAnnotations = method.getAnnotations();
//参数类型为[int ,int ]
this.parameterTypes = method.getGenericParameterTypes();
//参数上的注解为[Path,Path]
this.parameterAnnotationsArray = method.getParameterAnnotations();
}
然后调用build创建一个ServiceMethod对象,其实就是对各种成员变量进行初始化
1.callAdapter的初始化
2.responseType的初始化
3.responseConverter的初始化
4.解析方法上的注解
首先来看callAdapter的初始化
其实就是通过方法的返回值和方法上的注解 来返回我们之前已经注册过的CallAdapter方法 我们具体来分析一下:
public CallAdapter<?> nextCallAdapter(CallAdapter.Factory skipPast, Type returnType,
Annotation[] annotations) {
checkNotNull(returnType, "returnType == null");
checkNotNull(annotations, "annotations == null");
int start = adapterFactories.indexOf(skipPast) + 1;
for (int i = start, count = adapterFactories.size(); i < count; i++) {
CallAdapter<?> adapter = adapterFactories.get(i).get(returnType, annotations, this);
if (adapter != null) {
return adapter;
}
}
看代码 印证了我们的猜想,这时有人会有疑问了,我们这个集合adapterFactories里面目前有两个成员,RxJavaCallAdapterFactory 和 ExecutorCallAdapterFactory 那么到底是返回哪一个呢?
当然是我们在构建Retrofit的时候我们通过addCallAdapterFactory添加进来的那个为先了 如果他匹配了 那么就直接返回 最后返回的类型是
SimpleCallAdapter 类型 为什么是这个类型呢?
其实看代码就能明白一切,但是我这里还是来解释一下吧
CallAdapter<?> adapter = adapterFactories.get(i).get(returnType, annotations, this);
这行代码中我们上面分析 adapterFactories.get(i)的类型是RxJavaCallAdapterFactory 那么就会调用这个类兄的get方法 来获取实际的类型 ,并且传递的参数是
@Override
public CallAdapter<?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
//rawType = Observable<BaseModel<ArrayList<Benefit>>>其实就是方法的返回值
Class<?> rawType = getRawType(returnType);
//canonicalName = rx.Observable
String canonicalName = rawType.getCanonicalName();
//isSingle = false;
boolean isSingle = "rx.Single".equals(canonicalName);
//isCompletable = false;
boolean isCompletable = "rx.Completable".equals(canonicalName);
if (rawType != Observable.class && !isSingle && !isCompletable) {
return null;
}
//returnType实际类型是ParameterizedTypeImpl类型
//所以!(returnType instanceof ParameterizedType)这个条件是!true = false
if (!isCompletable && !(returnType instanceof ParameterizedType)) {
String name = isSingle ? "Single" : "Observable";
throw new IllegalStateException(name + " return type must be parameterized"
+ " as " + name + "<Foo> or " + name + "<? extends Foo>");
}
if (isCompletable) {
// Add Completable-converter wrapper from a separate class. This defers classloading such that
// regular Observable operation can be leveraged without relying on this unstable RxJava API.
// Note that this has to be done separately since Completable doesn't have a parametrized
// type.
return CompletableHelper.createCallAdapter(scheduler);
}
//最后执行到了这里 调用getCallAdapter方法将返回值和调度scheduler此时为null
CallAdapter<Observable<?>> callAdapter = getCallAdapter(returnType, scheduler);
if (isSingle) {
// Add Single-converter wrapper from a separate class. This defers classloading such that
// regular Observable operation can be leveraged without relying on this unstable RxJava API.
return SingleHelper.makeSingle(callAdapter);
}
return callAdapter;
}
最后调用了getCallAdapter方法并且参数为
returnType---->rx.Observable<com.stay4it.model.BaseModel<java.util.ArrayList<com.stay4it.model.Benefit>>>
scheduler----->null
private CallAdapter<Observable<?>> getCallAdapter(Type returnType, Scheduler scheduler) {
Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
//rawObservableType 其实解析出来是class com.stay4it.model.BaseModel的类型
Class<?> rawObservableType = getRawType(observableType);
if (rawObservableType == Response.class) {
if (!(observableType instanceof ParameterizedType)) {
throw new IllegalStateException("Response must be parameterized"
+ " as Response<Foo> or Response<? extends Foo>");
}
Type responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
return new ResponseCallAdapter(responseType, scheduler);
}
if (rawObservableType == Result.class) {
if (!(observableType instanceof ParameterizedType)) {
throw new IllegalStateException("Result must be parameterized"
+ " as Result<Foo> or Result<? extends Foo>");
}
Type responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
return new ResultCallAdapter(responseType, scheduler);
}
//所以最后会走这里 创建一个SimpleCallAdapter类型的对象返回
return new SimpleCallAdapter(observableType, scheduler);
}
经过千难万阻我们才执行完ServiceMethod创建路上的第一步,不过这也是最重要的一步,接下来都是在这步的基础上进行赋值的 我们接着往下看
responseType = callAdapter.responseType() 其实值是
com.stay4it.model.BaseModel<java.util.ArrayList<com.stay4it.model.Benefit>> 带泛型的返回值类型 就是T的实际类型
然后接着是responseConverter = createResponseConverter();
实际类型 GsonResponseBodyConverter 这个分析方法跟上面的adapterFactory分析方法一致 读者自行分析。
接下来是解析方法上的注解
for (Annotation annotation : methodAnnotations) { parseMethodAnnotation(annotation);}
其实就是将方法的的注解 以及注解里面的占位符给提取出来 在我们这个例子当中
@GET("api/data/福利/{pageCount}/{pageIndex}")
其实就是将这个注解上的信息提取出来
比如
this.httpMethod = httpMethod;//GET
this.hasBody = hasBody;//fasle
this.relativeUrl = value;
//api/data/福利/{pageCount}/{pageIndex}
this.relativeUrlParamNames = parsePathParameters(value);
其实就是set集合{pageCount,pageIndex}
接着是将参数中的注解 进行解析
ParameterHandler<?>[] parameterHandlers 就是它的初始化操作 这个有点麻烦 不过逻辑比较清晰
在我们这个例子当中 最后类型为
class ParameterHandler.Path<T>
到这一步后基本上ServiceMethod的构造已经完成了
估计大家也被绕晕了吧 ,不过Rxtrofit的代码就是这种风格,大家最好是对源码进行DEBUG 然后逐步了解每一步是如何执行以及如何赋值的 。
接下来我们继续分析 构建完成ServiceMethod 我们接下来分析如何返回返回一个Observable对象的
OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
以ServiceMethod和args为参数 创建一个OkHttpCall ,这个对象实际上是对OkHttpClient的封装
然后通过
return serviceMethod.callAdapter.adapt(okHttpCall);
这里的callAdapter我们上面已经分析,它的实际类型是
SimpleCallAdapter 最后调用这个类的adapt方法
@Override public <R> Observable<R> adapt(Call<R> call) {
Observable<R> observable = Observable.create(new CallOnSubscribe<>(call)) //
.lift(OperatorMapResponseToBodyOrError.<R>instance());
if (scheduler != null) {
return observable.subscribeOn(scheduler);
}
return observable;
}
走到这里 终于看到了熟悉的Observable了 通过调用他的静态方法 来创建一个Observable 返回 我们来看下 这个对象的构建
在这里给大家推荐一个博客 对RxJava的分析 非常详细
https://gank.io/post/560e15be2dca930e00da1083
我们接着分析
:
Observable.create方法做了哪些事情:
1.创建了一个CallOnSubscribe对象 并且将我们刚才创建的OkHttpCall对象赋值为CallOnSubscribe的成员变量originalCall
1.创建了一个Observable对象 以上面创建的CallOnSubscribe对象为参数 并且保存到成员变量
this.onSubscribe = f中去。
最后将这个Observable对象返回 。
这样我们的被观察者 也就是我们的Subject已经创建好了
接下来就是接受订阅了
RxJava订阅是通过函数subscribe()进行的 我们来看我们这个例子中
api.rxBenefits(20, page++)
.subscribeOn(Schedulers.io())//在IO线程中执行耗时操作
.observeOn(AndroidSchedulers.mainThread())//在主线程中更新UI
.subscribe(new Action1<BaseModel<ArrayList<Benefit>>>() {
@Override
public void call(BaseModel<ArrayList<Benefit>> model) {
if (action == PullRecycler.ACTION_PULL_TO_REFRESH) {
mDataList.clear();
}
if (model.results == null || model.results.size() == 0) {
recycler.enableLoadMore(false);
} else {
recycler.enableLoadMore(true);
mDataList.addAll(model.results);
adapter.notifyDataSetChanged();
}
recycler.onRefreshCompleted();
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
recycler.onRefreshCompleted();
}
})//然后订阅 将观察者 和被观察者绑定起来
;
这个具体是如何关联的呢 ,我觉得还是看源码吧
当我们调用方法subscribe方法时 会传递两个Action1对象 用来执行回调
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
return subscribe(new Subscriber<T>() {
@Override
public final void onCompleted() {
// do nothing
}
@Override
public final void onError(Throwable e) {
onError.call(e);
}
@Override
public final void onNext(T args) {
onNext.call(args);
}
});
}```
然后封装一个Subscriber对象 然后调用Observable类的subscribe方法完成订阅
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed 这个肯定不为空 这是刚才我们创建的对象
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
//这个类型我们之前创建Observable的时候分析过 实际类型是CallOnSubscribe
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
.......此处略过部分代码........
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
//主要操作是这个 其实这个方法里面做的就是只是返回了observable.onSubscribe 这个对象 其实就是调用了CallOnSubscribe的call方法 并将subscriber传递进去
return hook.onSubscribeReturn(subscriber);
.......此处略过部分代码........
}
@Override public void call(final Subscriber<? super Response<T>> subscriber) {
// Since Call is a one-shot type, clone it for each new subscriber.
Call<T> call = originalCall.clone();
//此时call实际类型为我们之前创建的OkHttpCall对象的拷贝
// Wrap the call in a helper which handles both unsubscription and backpressure.
//然后对OkHttpCall和subscribe 进行封装 以便之后的解除订阅
RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
subscriber.add(Subscriptions.create(requestArbiter));
subscriber.setProducer(requestArbiter);
}
最后返回的实际上还是我们subcribe方法中new的那个对象
具体是如何触发请求的执行的呢?
public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
producer = p;
if (subscriber != null) {
// middle operator ... we pass through unless a request has been made
if (toRequest == NOT_SET) {
// we pass through to the next producer as nothing has been requested
passToSubscriber = true;
}
}
}
// do after releasing lock
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}
其实最后调用了我们传进去的Producer对象的request方法 这应该就是真正执行请求的方法了吧 我们进去看看
@Override public void request(long n) {
if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
if (n == 0) return; // Nothing to do when requesting 0.
if (!compareAndSet(false, true)) return; // Request was already triggered.
try {
// call实际类型OkHttpCall 调用OkHttpCall的execute方法 然后返回Response
Response<T> response = call.execute();
if (!subscriber.isUnsubscribed()) {
//这里的subsriber就是我们前边构建的匿名内部类对象 调用他的OnNext方法 将会调用注册的Action11对象的onNext方法 将结果回调给UI线程进行界面处理
subscriber.onNext(response);
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
return;
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
我们来看OkHttpCall的execute方法
@Override public Response<T> execute() throws IOException {
okhttp3.Call call;
synchronized (this) {
if (executed) throw new IllegalStateException("Already executed.");
executed = true;
if (creationFailure != null) {
if (creationFailure instanceof IOException) {
throw (IOException) creationFailure;
} else {
throw (RuntimeException) creationFailure;
}
}
call = rawCall;
if (call == null) {
try {
## call = rawCall = createRawCall();//主要看这里 这里真正创建了真实的请求 返回值类型为RealCall
/*
private okhttp3.Call createRawCall() throws IOException {
Request request = serviceMethod.toRequest(args);//这里是对请求进行封装
//serviceMethod.callFactory 类型为OkHttpClient这是我们创建Retrofit对象时候创建的
okhttp3.Call call = serviceMethod.callFactory.newCall(request);
if (call == null) {
t hrow new NullPointerException("Call.Factory returned null.");
}
return call;
}
*/
} catch (IOException | RuntimeException e) {
creationFailure = e;
throw e;
}
}
}
if (canceled) {
call.cancel();
}
//最后执行call的execute方法 并将结果 传递给parseResponse方法 进行解析
return parseResponse(call.execute());
}
接着往下走
Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
ResponseBody rawBody = rawResponse.body();
// Remove the body's source (the only stateful object) so we can pass the response along.
rawResponse = rawResponse.newBuilder()
.body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
.build();
int code = rawResponse.code();
if (code < 200 || code >= 300) {
.........
}
if (code == 204 || code == 205) {
...........
}
ExceptionCatchingRequestBody catchingBody = new ExceptionCatchingRequestBody(rawBody);
try {
###T body = serviceMethod.toResponse(catchingBody); //到这里将我们得到打的body转换成我们的T类型 在我们这个例子里面就是BaseModel对象 具体的解析操作 肯定是交给我们的解析工厂了 直接贴代码
/*
//GsonResponseBodyConverter的convert方法
@Override public T convert(ResponseBody value) throws IOException { JsonReader jsonReader = gson.newJsonReader(value.charStream()); try { return adapter.read(jsonReader); } finally { value.close(); }}
*/
return Response.success(body, rawResponse);
} catch (RuntimeException e) {
// If the underlying source threw an exception, propagate that rather than indicating it was
// a runtime exception.
catchingBody.throwIfCaught();
throw e;
}
}
最后将解析的结果 封装成 一个Response对象 返回 注意这里的Response是retrofit2包下的
最后 回到我们调用处 ,也就是RequestAribre方法的request方法
try { Response<T> response = call.execute(); if (!subscriber.isUnsubscribed()) { subscriber.onNext(response); }} catch (Throwable t) { Exceptions.throwIfFatal(t); if (!subscriber.isUnsubscribed()) { subscriber.onError(t); } return;}if (!subscriber.isUnsubscribed()) { subscriber.onCompleted();}
前边我们说过 此时这里的subscriber对象是我们订阅时候创建的匿名内部类对象 里面会回调用户自定义的onnext方法
我们来看下:
.subscribe(new Action1<BaseModel<ArrayList<Benefit>>>() {
@Override
public void call(BaseModel<ArrayList<Benefit>> model) {
if (action == PullRecycler.ACTION_PULL_TO_REFRESH) {
mDataList.clear();
}
if (model.results == null || model.results.size() == 0) {
recycler.enableLoadMore(false);
} else {
recycler.enableLoadMore(true);
mDataList.addAll(model.results);
adapter.notifyDataSetChanged();
}
recycler.onRefreshCompleted();
}
}
经过重重回调 终于回来了 最后到了最重要的一步 拿到数据了 那么就是该填充适配器 来显示到界面上了
这里就是我们所说的UI线程 main线程了 。