现在有这么个逻辑需求, 并行请求两个网络请求 A和B , 但是要确保先处理A的结果, 在处理B的结果.
实现这个需求其实很简单, 只要定义一个全局变量去判断就可以了, 或者利用挂起线程等等操作, 我们这里使用Rxjava的操作符来实现一下.
先看一下A方案
public static <T> ObservableTransformer<T, T> rxSchedulerHelper() { //compose简化线程
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
Observable<String> a; // A请求
a.compose(RxUtil.<String>rxSchedulerHelper());
Observable<Integer> b; // B请求
b.compose(RxUtil.<Integer>rxSchedulerHelper());
Observable.zip(a, b, new BiFunction<String, Integer, Object>(){
@Override
public Object apply(@NonNull String s, @NonNull Integer integer) throws Exception {
// dosomething with A结果
// dosomething with B结果
return new Object();
}
}).subscribe();
看完这个A方案, A和B会并行在各自的子线程当中, 并且会合并到 apply()方法中.
这里可以先处理A的处理结果s, 后处理B的处理结果integer.
弊端: 效率低
根据上面的写法apply()方法, 必然是两个请求结果都收到之后才会调用的方法. 如果A请求先结束, 那么完全可以先处理A的请求结果, 而不去等待B的请求结果
B方案, 提高效率
Observable<String> a; // A请求
a.compose(RxUtil.<String>rxSchedulerHelper());
a.doOnEach(new Consumer<Notification<String>>() {
@Override
public void accept(@NonNull Notification<String> stringNotification) throws Exception {
//do something with A结果
}
});
Observable<Integer> b; // B请求
b.compose(RxUtil.<Integer>rxSchedulerHelper());
Observable.zip(a, b, new BiFunction<String, Integer, Object>(){
@Override
public Object apply(@NonNull String s, @NonNull Integer integer) throws Exception {
// dosomething with B结果
return new Object();
}
}).subscribe();
如果A优先请求结束, 那么会执行 doOnEach() 中的方法, 接着在执行 apply() 中的方法.
如果B优先请求结束, 也是相同的结果.
弊端: 没有考虑到两个请求出错的情况.
如果B请求时出错, 但是这是A请求还没有完成, 那么A的请求就会被中断, 但是我的业务逻辑是A请求不应该不受到B请求的影响, 也就是说B请求就算出错了, A请求也需要继续请求, 并处理结果.
为了实现这个需求我们需要用到 zip() 操作符中的 delayError 功能
也就是延迟报错. 我们先看一下 zip() 的源码, 看看 delayError 是如何工作的
// Observable
public static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper,
boolean delayError, int bufferSize, ObservableSource<? extends T>... sources) {
//...
return RxJavaPlugins.onAssembly(new ObservableZip<T, R>(sources, null, zipper, bufferSize, delayError));
}
// ObservableZip
public ObservableZip(ObservableSource<? extends T>[] sources,
Iterable<? extends ObservableSource<? extends T>> sourcesIterable,
Function<? super Object[], ? extends R> zipper,
int bufferSize,
boolean delayError) {
this.sources = sources; // 请求A, B 的数组
this.sourcesIterable = sourcesIterable; // null
this.zipper = zipper; // apply() 方法
this.bufferSize = bufferSize; // 因为数据量小, 这个可以忽略
this.delayError = delayError; // 是否延迟加载
}
public void subscribeActual(Observer<? super R> s) {
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
// source 不为空
} else {
count = sources.length;
}
ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(s, zipper, count, delayError);
zc.subscribe(sources, bufferSize);
}
ZipCoordinator(Observer<? super R> actual,
Function<? super Object[], ? extends R> zipper,
int count, boolean delayError) {
// 观察者
this.actual = actual;
// 转换方法
this.zipper = zipper;
// 包装观察者数组
this.observers = new ZipObserver[count];
// 每个被观察者产生数据后保存在该数组
this.row = (T[])new Object[count];
// 如果上面的容器中有数据时, 延迟发送error
this.delayError = delayError;
}
public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
// 给 包装观察者数组元素赋值
ZipObserver<T, R>[] s = observers;
int len = s.length;
for (int i = 0; i < len; i++) {
s[i] = new ZipObserver<T, R>(this, bufferSize);
}
this.lazySet(0);
actual.onSubscribe(this);
for (int i = 0; i < len; i++) {
if (cancelled) {
return;
}
// 订阅 被观察者
// 这里研究被观察者为异步时的情况
sources[i].subscribe(s[i]);
}
}
// 包装后的观察者
ZipObserver(ZipCoordinator<T, R> parent, int bufferSize) {
// 上面的ZipCoordinator
this.parent = parent;
// 队列, 存储 被观察者发送的数据
this.queue = new SpscLinkedArrayQueue<T>(bufferSize);
}
@Override
public void onNext(T t) {
// 队列添加元素
queue.offer(t);
parent.drain();
}
@Override
public void onError(Throwable t) {
error = t;
// done为true 说明结束事件
done = true;
parent.drain();
}
@Override
public void onComplete() {
done = true;
parent.drain();
}
// 先简单看一下取消事件的方法
void clear() {
for (ZipObserver<?, ?> zs : observers) {
// 清空每个包装观察者的队列
zs.queue.clear();
}
}
// 取消方法
void cancel() {
clear();
cancelSources();
}
void cancelSources() {
for (ZipObserver<?, ?> zs : observers) {
// 调用dispose() 终止事件
zs.dispose();
}
}
上面事件中 都调用了 parent.drain()方法
public void drain() {
if (getAndIncrement() != 0) {
return;
}
int missing = 1;
final ZipObserver<T, R>[] zs = observers;
final Observer<? super R> a = actual;
final T[] os = row;
final boolean delayError = this.delayError;
for (;;) {
for (;;) {
int i = 0;
int emptyCount = 0;
for (ZipObserver<T, R> z : zs) {
if (os[i] == null) {
// 观察者是否结束
boolean d = z.done;
// 从观察者队列中取出元素
T v = z.queue.poll();
// 刚刚取出的元素是否为空
boolean empty = v == null;
// 如果 d = true 并且 队列中没有元素, 那么必然为true
// 如果 d = true 并且 队列中有元素, 延迟发送Error, 那么为false
// 只要 d = true 那么 return 后不是cancel, 就是 onComplete
if (checkTerminated(d, empty, a, delayError, z)) {
return;
}
if (!empty) {
// 不为空 数组添加数据
os[i] = v;
} else {
emptyCount++;
}
} else {
// 当 os容器中有数据并且不延迟发送Error 并且 error不为空, 终止事件
if (z.done && !delayError) {
Throwable ex = z.error;
if (ex != null) {
cancel();
a.onError(ex);
return;
}
}
}
i++;
}
// emptyCount 不等于0表示有个观察者中的队列还没有数据
// 如果要执行下面的 onNext() 必须要所有观察者都提供数据才能接着执行下去
if (emptyCount != 0) {
break;
}
R v;
try {
v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancel();
a.onError(ex);
return;
}
a.onNext(v);
Arrays.fill(os, null);
}
missing = addAndGet(-missing);
if (missing == 0) {
return;
}
}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super R> a, boolean delayError, ZipObserver<?, ?> source) {
// 判断是否结束
if (cancelled) {
cancel();
return true;
}
// d 是观察者的 done
if (d) {
// 如果是延迟发送Error
if (delayError) {
// 如果队列取出的元素为空
if (empty) {
Throwable e = source.error;
// 取消事件
cancel();
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
return true;
}
// 结论: 如果延迟发送Error并且队列中有数据, 就不会取消事件
} else {
// 不延迟发送消息
Throwable e = source.error;
if (e != null) {
cancel();
a.onError(e);
return true;
} else
if (empty) {
cancel();
a.onComplete();
return true;
}
// 结论: 不延迟发送Error时, 如果有 error或者队列为空 都会取消事件
}
}
return false;
}
根据 drain() 方法可以知道, 想要延迟报错, 那么 delayError 为true是必须的. 并且观察者队列中的数据不能为空. 这样就算出错了, 他也会继续执行.
方案B中如果B请求出错了, 它的观察者队列中没有数据, 所以会直接取消所有事件
根据这个点出发实现方案C
// 不放实例代码了, 直接贴项目中使用的代码
public static <T, K> ObservableTransformer<T, Object> concurrentRequestOrderRespond(
final Observable<K> source2
, final Consumer<T> onNext
, final Consumer<Throwable> onError
, final Action onComplete
, final Consumer<K> source2Exc){
return new ObservableTransformer<T, Object>() {
@Override
public ObservableSource<Object> apply(@NonNull Observable<T> upstream) {
final Object o1 = new Object();
final Object o2 = new Object();
return Observable.zip(
// 这里不能用 merge() 因为 merge() 不保证顺序
// 使用 concat() 保证顺序, 与一个事件合并, 该事件为了队列有数据
Observable.concat(upstream
.compose(RxUtil.rxSchedulerHelper())
.doOnEach(new Consumer<Notification<T>>() {
@Override
public void accept(@NonNull Notification<T> tNotification) throws Exception {
if (tNotification.isOnNext()) {
// 执行onNext()和onComplete()
// 在请求B出错的情况下, 可能不执行onComplete()
// 所以onNext()完 执行 onComplete() 也是一样的
onNext.accept(tNotification.getValue());
onComplete.run();
} else if (tNotification.isOnError()) {
onError.accept(tNotification.getError());
}
}
})
, Observable.just(o1)) // 到这里为请求A
, Observable.concat(Observable.just(o2), source2
.compose(RxUtil.<K>rxSchedulerHelper())) // 到这里为请求B
, new BiFunction<Object, Object, Object>() {
@Override
public Object apply(@NonNull Object t, @NonNull Object k) throws Exception {
if (t == o1){ //次流
source2Exc.accept((K) k); //执行请求B的结果
}
return new Object();
}
}, true); // delayError 设置为true
}
};
}
请求A队列的事件依次为 A请求, 空事件
请求B队列的事件依次为 空事件, B请求
zip() 的 apply() 方法需要配对, 也就是说如果请求A队列为一个事件, 请求B队列为两个事件, 那么apply(), 只会执行一次.
现在该方法, 已经实现了我的业务逻辑了.
- 并行请求A和B
- A先处理结果, B在处理结果
- B出错, A还没有请求完成, A依然继续请求, 并处理结果
- 如果A请求出错, 那么不管B比A先完成还是后完成的请求都不会接着执行下去
照着这个思路, 也可以继续拓展.