简单介绍一下RxJava 2.0的多种流:
1.Completable
特性:这个流没有数据,只会收到error或者complete
示例:
Completable.complete()
.subscribe(() -> printThread("on complete 1"));
Completable.error(new Callable<Throwable>() {
@Override
public Throwable call() throws Exception {
// TODO Auto-generated method stub
return new NullPointerException();
}
})
.subscribe(() -> printThread("on complete 2"),
(e) -> printThread("on error 2 [" + e.getMessage() + "]"));
输出:
on complete 1[main]
on error 2 [null][main]
2.Single
特性:这个流只会收到一个数据或者一个error,也就是要不然执行onSuccess要不然就执行onError
示例:
Single.just(1)
.subscribe(i -> printThread(String.valueOf(i)));
System.out.println("==============");
Single.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
// TODO Auto-generated method stub
return 1 / 0;
}
}).subscribe((i, error) -> {
printThread(String.valueOf(i));
printThread(String.valueOf(error));
});
输出:
1[main]
==============
null[main]
java.lang.ArithmeticException: / by zero[main]
可以看到,正常数据下,收到了数据1,出错的时候,只会收到一个error。
我们看Single的订阅接口SingleObserver,如下:
public interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable e);
}
只会存在2种回调,符合我们的打印输出。(上面示例代码仅仅调用的是简单的单个情况订阅,查看源码,最终都封装成了SingleObserver)
3.Maybe
特性:和Single类似正常流程也是只执行onSuccess,但在出现错误的时候,可以选择是执行onError还是onComplete
示例(正常流程):
Maybe.just(1)
.subscribe(i -> printThread("success " + i),
(e) -> printThread("error " + e),
() -> printThread("complete"));
输出:
success 1[main]
示例(错误):
Maybe.fromCallable(() -> {
return 1 / 0;
}).subscribe(i -> printThread("success " + i),
(e) -> printThread("error " + e),
() -> printThread("complete"));
输出:
error java.lang.ArithmeticException: / by zero[main]
我们调用onErrorComplete干预:
Maybe.fromCallable(() -> {
return 1 / 0;
})
.onErrorComplete()
.subscribe(i -> printThread("success " + i),
(e) -> printThread("error " + e),
() -> printThread("complete"));
输出:
complete[main]
4.Flowable
和Observable功能几乎一模一样,区别在于:
1.定义的类功能不一样
Observable 的订阅者是:
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T value);
void onError(Throwable e);
void onComplete();
}
public interface Disposable {
void dispose();
boolean isDisposed();
}
Flowable的订阅者是:
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
2.Flowable可以通过Subscription对象,调用request(n),响应式拉取数据,来支持背压特性
示例代码:
private static int count = 1;
private static boolean isDataEnd() {
return count > 1000;
}
private static void test() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
while (!isDataEnd() && !e.isCancelled()) {// 生产数据条件
while (e.requested() <= 0) {// 如果e.request值是0,说明消费者还没有消费完毕,我们就休息
Thread.sleep(1000);
}
printThread(String.format("OUT生产数据[%d]", count));
e.onNext(count++);
}
e.onComplete();
}
}, BackpressureStrategy.BUFFER)
.subscribe(new Subscriber<Integer>() {
private Subscription mSub;
@Override
public void onComplete() {
// TODO Auto-generated method stub
printThread("消费完毕");
unloopMain();
}
@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub
printThread(arg0.getMessage());
unloopMain();
}
@Override
public void onNext(Integer value) {
// TODO Auto-generated method stub
try {
Thread.sleep(100);
printThread(String.format("IN消费数据[%d]", value));
mSub.request(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void onSubscribe(Subscription arg0) {
// TODO Auto-generated method stub
mSub = arg0;
mSub.request(1);
}
});
loopMain();
}
private static void loopMain() {
do {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (needBreak)
break;
} while (true);
System.out.println("END");
}
private static void unloopMain() {
needBreak = true;
}
输出如下:
OUT生产数据[1][main]
IN消费数据[1][main]
OUT生产数据[2][main]
IN消费数据[2][main]
OUT生产数据[3][main]
IN消费数据[3][main]
OUT生产数据[4][main]
IN消费数据[4][main]
OUT生产数据[5][main]
流程分析:
1.调用Flowable.create生产我们的数据流,里面有个新的类型,如下:
public interface FlowableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable s);
void setCancellable(Cancellable c);
long requested();
boolean isCancelled();
FlowableEmitter<T> serialize();
}
其他方法和Observable发射器的功能类似,我们主要需要requested()方法,获取当前请求个数,如果为0代表还在消费数据,不需要新的数据,我们就休息。
2.BackpressureStrategy,代表支持背压的策略,如下:
public enum BackpressureStrategy {
//结合onBackpressureXXX()才会生效
MISSING,//在onSubcription时候,s.request(Long.MAX_VALUE);设置最大值。导致一直生产数据。
//直接抛出异常,如果数据溢出
ERROR,
//所有数据会保存到缓存里面
BUFFER,
//丢弃最新的
DROP,
//保存最新的,覆盖老的
LATEST
}
3.背压策略,一定要在线程变换之前去调用,线程变换后,收到的订阅者发生了变化,不是同一个。上面代码,因为是同一个线程,调用request(1),将直接设置上游数据生产者FlowableEmitter的值为1,但如果是切换线程了,将无法直接影响,增加代码如下:
Flowable.create(...)
.observeOn(Schedulers.computation())//切换线程
.subscribe(...);
打印如下:
...
OUT生产数据[127][requested = 2][main]
OUT生产数据[128][requested = 1][main]
IN消费数据[1][RxComputationThreadPool-1]
IN消费数据[2][RxComputationThreadPool-1]
IN消费数据[3][RxComputationThreadPool-1]
IN消费数据[4][RxComputationThreadPool-1]
...
我们可以看到,下游的request(1)并不会影响上游的值,上游使用了默认值128的缓存大小。先生产了128个数据,再开始消费。两个疑问解答:
1.128怎么来的,在调用BackpressureStrategy.Buffer时候,生成的FlowableEmitter实际类型是BufferAsyncEmitter,它默认值就是128
2.我们怎么去控制这个值呢,既然下游影响不到这个大小,可以通过如下代码:
...
.observeOn(Schedulers.computation(), false, 4)
...
打印输出,如下:
...
OUT生产数据[1][requested = 4][main]
OUT生产数据[2][requested = 3][main]
OUT生产数据[3][requested = 2][main]
OUT生产数据[4][requested = 1][main]
IN消费数据[1][RxComputationThreadPool-1]
IN消费数据[2][RxComputationThreadPool-1]
IN消费数据[3][RxComputationThreadPool-1]
IN消费数据[4][RxComputationThreadPool-1]
OUT生产数据[5][requested = 3][main]
OUT生产数据[6][requested = 2][main]
OUT生产数据[7][requested = 1][main]
IN消费数据[5][RxComputationThreadPool-1]
IN消费数据[6][RxComputationThreadPool-1]
IN消费数据[7][RxComputationThreadPool-1]
...
这里看到,我们只生产了4个数据,就消费了,以后就是生产3个,这是因为在我们调用observeOn()生成的内部对象FlowableObserveOn里面有个limit = prefetch - (prefetch >> 2); prefetch实际就是传入的buffersize。如下:
void runAsync(){
...
e++;
if (e == limit) {
if (r != Long.MAX_VALUE) {
r = requested.addAndGet(-e);
}
s.request(e);
e = 0L;
}
}
这个方法在内部类 FlowableObserveOn$ObserveOnSubscriber,我们异步调用sub.request(n)将最终触发到runAsync(),它会去设置requested也就是上游的数据,所以这个n将无法直接反应到上游,而同步的n是直接设置给上游了
分析线程变换:
使用最简单的Single来进行探究,代码如下:
private static void syncRx(){
Single.fromCallable(() -> {
printThread("生产数据");
return "s";
})
.subscribe((s) -> {
printThread("消费数据");
});
}
private static void printThread(String msg){
System.out.println(String.format("[%s][%s]", msg, Thread.currentThread().getName()));
}
打印输出:
[生产数据][main]
[消费数据][main]
查看源码,Single.fromCallable 生成的就是SingleFromCallable对象,订阅表达式生成的是ConsumerSingleObserver对象,代码变换如下:
private static void syncChangeRx() {
new SingleFromCallable<String>(() -> {
printThread("生产数据");
return "s";
}).subscribe(new ConsumerSingleObserver<>((s) -> {
printThread("消费数据");
}, Functions.ERROR_CONSUMER));
}
输出打印和上面一模一样,所以点开方法subscribe(),如下:
public final void subscribe(SingleObserver<? super T> subscriber) {
...
try {
subscribeActual(subscriber);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
...
}
}
最终调用的是抽象方法 subscribeActual(),而我们知道我们来源于SingleFromCallable,所以实际实现在SingleFromCallable.subscribeActual()方法里面,如下:
@Override
protected void subscribeActual(SingleObserver<? super T> s) {
s.onSubscribe(EmptyDisposable.INSTANCE);
try {
T v = callable.call();
if (v != null) {
s.onSuccess(v);
} else {
s.onError(new NullPointerException("The callable returned a null value"));
}
} catch (Throwable e) {
...
}
}
里面也很简单,参数 s,就是我们自己定义的后生产的ConsumerSingleObserver对象,callable就是我们定义的生产对象,所以下游的订阅动作,如下:
1.订阅触发流程
2.进入真实SingleFromCallable.subscribeActual()
3.调用callable.call()生产数据或者异常
4.回调给ConsumerSingleObserver,我们自己的消费者
5.完成结束
以上就是最简单的单线程调用了,在以上的基础上,我们增加一个线程切换,如下:
private static void asyncChangeRx() {
new SingleFromCallable<String>(() -> {
printThread("生产数据");
return "s";
})
.observeOn(Schedulers.computation())
.subscribe(new ConsumerSingleObserver<>((s) -> {
printThread("消费数据");
}, Functions.ERROR_CONSUMER));
}
打印输出:
[生产数据][main]
[消费数据][RxComputationThreadPool-1]
查看源码,我们可以知道,observeOn也生产了一个新的包装流SingleObserveOn,变换,如下:
private static void asyncChangeRx() {
SingleSource<String> producer = new SingleFromCallable<>(() -> {
printThread("生产数据");
return "s";
});
SingleObserver<String> consumer = new ConsumerSingleObserver<>((s) -> {
printThread("消费数据");
}, Functions.ERROR_CONSUMER);
new SingleObserveOn<>(producer, Schedulers.computation())
.subscribe(consumer);
}
这样也就是具有线程变换功能的SingleObserveOn,包裹起了原始的生产者SingleFromCallable,其他不变,因此我们先认为是单一线程模型可以大概推出:
consumer订阅 --->
SingleObserveOn.subscribeActual() --->
SingleFromCallable.subscribeActual()
所以真正线程变换就在SingleObserveOn.subscribeActual()里面,实现如下:
@Override
protected void subscribeActual(final SingleObserver<? super T> s) {
source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler));
}
这里SingleObserver<? super T> s就是我们自己的 consumer, 而source就是我们的 producer,一定要注意,上面这个订阅过程会导致上游产生数据,因此将触发ObserveOnSingleObserver.onSuccess(T t),我们再看具体实现代码,如下:
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
...
final SingleObserver<? super T> actual;//我们实际的订阅者
final Scheduler scheduler;//我们设定的线程执行类
T value;//保存上游的数据
Throwable error;//保存上游的错误
@Override
public void onSuccess(T value) {
this.value = value;//拿到上游数据
Disposable d = scheduler.scheduleDirect(this);//要求线程执行自己
DisposableHelper.replace(this, d);
}
@Override
public void onError(Throwable e) {
this.error = e;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run() {//这里异步执行原有的调用流程
Throwable ex = error;
if (ex != null) {
actual.onError(ex);
} else {
actual.onSuccess(value);
}
}
...
}
生产数据后,ObserveOnSingleObserver本身继承了Runnable,将同步调用的流程封装在了run()方法里面,再叫scheduler去执行自己,完成了线程切换。
线程切换最简单的整个流程,就是以上调用,如果加上生产者也要切换线程,也是一样的,它有个对象,SingleSubscribeOn来包装流,包装过程伪代码如下:
从调用链最后开始,往上包装:
SingleObserveOn --包含-->
SingleSubscribeOn --包含-->
producer
订阅过程,又最后往上 被包装:
consumer --被包含-->
ObserveOnSingleObserver--被包含--> 负责消费者切换
SubscribeOnObserver 负责生产者切换
SubscribeOnObserver 里面也很简单,也是run方法,如下:
@Override
public void run() {
source.subscribe(this);//订阅就是生产数据
}
线程变换总结:
1.订阅将会触发生产
2.将上游的订阅过程,封装到runnable,再交由scheduler去执行
3.将下游的消费过程,封装到runnable,再交由scheduler去执行
其他思考:
多次线程变换,生产数据会在哪次里面?,而消费过程会在哪次里面?
示例代码:
public static void main(String[] args) {
asyncChangeRx2();
loopMain();
}
private static void asyncChangeRx2() {
Single.fromCallable(() -> {
printThread("生产数据");
return "s";
})
.subscribeOn(myScheduler("生产包装线程1"))
.subscribeOn(myScheduler("生产包装线程2"))
.observeOn(myScheduler("消费包装线程1"))
.observeOn(myScheduler("消费包装线程2"))
.subscribe((s) -> {
printThread("消费数据");
unloopMain();
});
}
private static Scheduler myScheduler(String name) {
return new Scheduler() {
@Override
public Worker createWorker() {
// TODO Auto-generated method stub
return new NewThreadWorker(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
// TODO Auto-generated method stub
return new Thread(new HookRun(r), name);
}
});
}
};
}
//简单打印活动线程
private static class HookRun implements Runnable {
private Runnable mRealRun;
public HookRun(Runnable r) {
// TODO Auto-generated constructor stub
mRealRun = r;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println(String.format("执行Run[%s]", Thread.currentThread().getName()));
mRealRun.run();
}
}
private static void printThread(String msg) {
System.out.println(String.format("[%s][%s]", msg, Thread.currentThread().getName()));
}
private static void loopMain() {
do {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (needBreak)
break;
} while (true);
System.out.println("END");
}
private static void unloopMain() {
needBreak = true;
}
private static boolean needBreak;
}
输出:
执行Run[生产包装线程2]
执行Run[生产包装线程1]
[生产数据][生产包装线程1]
执行Run[消费包装线程1]
执行Run[消费包装线程2]
[消费数据][消费包装线程2]
END
说明生产最终在第一次包装里面,消费在最后一次包装里面,符合我们刚才分析的包装过程伪代码的方向。离活动(生产或者消费)最近的一次线程切换包装负责执行