RxBus

导包

compile 'io.reactivex.rxjava2:rxjava:2.1.6'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'com.jakewharton.rxrelay2:rxrelay:2.0.0'

调用

RxBus.getInstance().send(user);(user为UserModel实例)


register.png

RxBus源码

import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay;

import java.util.concurrent.ConcurrentHashMap;

import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;


public class RxBus {

    private Relay<Object> bus = null;
    private static RxBus instance;

    //禁用构造方法
    private RxBus() {
        bus = PublishRelay.create().toSerialized();
    }

    public static RxBus getInstance() {
        if (instance == null) {
            synchronized (RxBus.class) {
                if (instance == null) {
                    instance = new RxBus();
                }
            }
        }
        return instance;
    }

    public void send(Object event) {
        bus.accept(event);
    }

    public <T> Observable<T> toObservable(Class<T> eventType) {
        return bus.ofType(eventType);
    }

    ConcurrentHashMap<Class, Object> mStickMap = new ConcurrentHashMap<>();

    /**
     * 发送rxbus粘性广播
     *
     * @param event
     */
    public void sendSticky(Object event) {
        mStickMap.put(event.getClass(), event);
    }

    /**
     * 消费粘性广播(仅一处消费)
     */
    public <T> void registerStickyJustHere(final Class<T> eventType, Scheduler scheduler, Consumer<T> consumer) {
        T t = (T) mStickMap.get(eventType);
        if (t != null) {
            Observable.just(t).observeOn(scheduler).subscribe(consumer);
            clearSticky(eventType);
        }
    }

    public <T> void registerStickyJustHere(Class<T> eventType, Consumer<T> consumer) {
        registerStickyJustHere(eventType, AndroidSchedulers.mainThread(), consumer);
    }
    /**
     * 消费粘性广播
     */
    public <T> void registerSticky(Class<T> eventType, Scheduler scheduler, final Consumer<T> consumer) {
        T t = (T) mStickMap.get(eventType);
        if (t != null) {
            Observable.just(t).subscribe(consumer);
        }
    }

    public <T> void registerSticky(Class<T> eventType, Consumer<T> consumer) {
        registerSticky(eventType, AndroidSchedulers.mainThread(), consumer);
    }
    public <T> void clearSticky(Class<T> eventType){
        mStickMap.remove(eventType);
    }

    public boolean hasObservers() {
        return bus.hasObservers();
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
                                   Action onComplete, Consumer onSubscribe) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete, onSubscribe);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
                                   Action onComplete) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
                                   Action onComplete, Consumer onSubscribe) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete, onSubscribe);
      }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
                               Action onComplete) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError);
    }

    public void unregister(Disposable... disposables) {
        for (Disposable disposable : disposables
                ) {
            if (disposable != null && !disposable.isDisposed()) {
                disposable.dispose();
            }
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 173,637评论 25 708
  • 新的应用中用到了RxBus,这里简单叙述一下. 1.添加依赖 // RxJava 2implementation'...
    沐沐小风阅读 487评论 1 0
  • 了解RxJava也蛮久了,原来一直不了解其中的原理,尤其是配合Retrofit组合之后线程切换和类型转换老是晕,刚...
    RoboyCore阅读 1,025评论 0 1
  • Rxjava目前已经很火了,如果你尚未了解,可以查看rxjava详情。RxBus并不是一个库,而是一种模式,用过E...
    圈圈猫阅读 1,533评论 0 1
  • 物有本末,事有终始。 “但愿世间人无病,何愁架上药生尘” 吃药只是手段,不生病才是根本。 赚钱只是方法,幸福的活着...
    望心镜阅读 613评论 0 3