最近项目中把rxjava 切换到2.0 所以相对应的一些都要做出改变 新版本的 独立出来一个Flowable 来处理背压事件.
下面就是修改过的 适用于Rxjava2.0 版本的 rxbus
/**
* Created by storm on 2017/9/20.
* <p>
* Rxbus
*/
public class RxBus {
private static final String TAG = RxBus.class.getSimpleName();
private static volatile RxBus mInstance;
/**
* 默认 bus ;
*/
private Subject<Object> _mBus;
/**
* 背压
*/
private FlowableProcessor<Object> _mBackPressureBus;
private Map<Object, CompositeDisposable> mSubscription;
private RxBus() {
_mBus = PublishSubject.create().toSerialized();
_mBackPressureBus = PublishProcessor.create().toSerialized();
}
public static RxBus getInstance() {
if (mInstance == null) {
synchronized (RxBus.class) {
if (mInstance == null) {
mInstance = new RxBus();
}
}
}
return mInstance;
}
/**
* 发送普通事件
*/
public void send(Object event) {
_mBus.onNext(event);
}
/**
* 发送背压事件
*/
public void sendByBackPressure(Object event) {
_mBackPressureBus.onNext(event);
}
/**
* 接收普通事件
*/
public <T> Observable<T> toObservable(Class<T> eventType) {
return _mBus.ofType(eventType);
}
/**
* 接受背压事件
*/
public <T> Flowable<T> toFlowable(Class<T> eventType) {
return _mBackPressureBus.ofType(eventType);
}
/**
* 普通事件的处理
*/
public <T> Disposable doSubscribe(Class<T> eventType, Consumer<T> next, Consumer<Throwable> error) {
return toObservable(eventType)
.compose(RxHelper.<T>IO_Main())
.subscribe(next, error);
}
/**
* 背压事件处理
*/
public <T> Flowable doFlowable(Class<T> eventType, Subscriber<T> tSubscriber) {
toFlowable(eventType)
.onBackpressureLatest() //背压策略
.compose(RxHelper.<T>IO_Main_Flowable())
.subscribeWith(tSubscriber);
return toFlowable(eventType);
}
/**
* 是否有订阅者
*/
public boolean hasSubscribers(boolean isBackPressure) {
if (!isBackPressure)
return _mBus.hasObservers();
else
return _mBackPressureBus.hasSubscribers();
}
/**
* 背压解除订阅
*/
public void unSubscription(){
_mBackPressureBus.onComplete();
}
/**
* 添加订阅到集合(一般事件)
*/
public void addSubscriptions(Object o, Disposable disposable) {
if (mSubscription == null) {
mSubscription = new HashMap<>();
}
String key = o.getClass().getName();
if (mSubscription.get(key) != null) {
mSubscription.get(key).add(disposable);
} else {
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(disposable);
mSubscription.put(key, compositeDisposable);
}
}
/**
* 解除订阅
* 一般事件的解除订阅
*
* @param o
*/
public void clearSubscriptions(Object o) {
if (mSubscription == null) {
return;
}
String key = o.getClass().getName();
if (!mSubscription.containsKey(key)) {
return;
}
if (mSubscription.get(key) != null) {
mSubscription.get(key).dispose();
}
mSubscription.remove(key);
}
}