-
RxJava已经更新到2.x版本,和1.x版本差别有点大,建议直接使用2.x版本
compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile 'io.reactivex.rxjava2:rxjava:2.0.5'
-
简单使用
-
创建被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("aaa"); e.onNext("ccc"); e.onNext(Thread.currentThread().getName()); e.onComplete(); } });
-
创建观察者
Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe: " + d.toString()); } @Override public void onNext(String s) { Log.e(TAG, "onNext: " + s); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: " + e); } @Override public void onComplete() { Log.e(TAG, "onComplete: " + Thread.currentThread().getName()); } };
-
订阅事件
observable.subscribe(observer);
-
简单的链式编程
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("haha"); e.onNext("hehe"); e.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe: " + d.toString()); } @Override public void onNext(String s) { Log.e(TAG, "onNext: " + s); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: " + e); } @Override public void onComplete() { Log.e(TAG, "onComplete: "); } });
-
-
更换线程
observable.subscribeOn(Schedulers.newThread()) //指定被观察者的执行线程 .observeOn(AndroidSchedulers.mainThread()) //指定观察者的执行线程 .subscribe(observer); //订阅事件
- 注意:
被观察者多次指定线程,只有第一次指定的有效;
观察者多次指定线程,可以分开有效;
-
observable.subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "After observeOn(mainThread), current thread is: " + Thread.currentThread().getName()); } }) .observeOn(Schedulers.io()) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "After observeOn(io), current thread is : " + Thread.currentThread().getName()); } }) .subscribe(consumer);
- 注意:
-
MAP操作符
- map操作符的作用就是对上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化。
- amp操作符可以将上游发来的事件转换为任意的类型, 可以是一个Object, 也可以是一个集合。
【未完待续。。。】