添加如下依赖即可开始使用:
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
map的使用,注意map是一对一,flatmap是一对多
String filePath = "/storage/emulated/0/1.jpg";///storage/emulated/0这个路径是手机内存的路径,不是存储卡的路径
Observable<Bitmap> sender = Observable.just(filePath).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.io()).map(new Function<String, Bitmap>() {//这里的subscribeOn(Schedulers.newThread())是新创建一个线程来发射事件,.observeOn(Schedulers.io())是将接下来的map操作放在io线程上进行
@Override
public Bitmap apply(@NonNull String s) throws Exception {
Log.i("xian", Thread.currentThread().getName());
return BitmapFactory.decodeFile(s);
}
});
Observer<Bitmap> receiver = new Observer<Bitmap>() {
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(Bitmap bitmap) {
imageView.setImageBitmap(bitmap);
Log.i("xian", Thread.currentThread().getName());
}
};
sender.observeOn(AndroidSchedulers.mainThread()).subscribe(receiver);//需要先添加Rxandroid库,才能使用AndroidSchedulers.mainThread(),先切换回主线程,然后执行receiver这个Observer中的代码
Observable.just("dd").subscribe();
Observable.just(1, 2, 3, 4).subscribe(new Consumer<Integer>() {//Consumer是比Observer精简的
@Override
public void accept(Integer integer) throws Exception {
Log.i("xinxi",integer+"");
}
});
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {//自己创建Observable,没有用just,from之类的
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("R");
e.onNext("x");
e.onNext("j");
e.onNext("a");
e.onNext("v");
e.onNext("a");
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
Log.i("xinxi", s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(observer);
repeat的用法
Observable.just("1","2","3").repeat(3).subscribe(new Consumer<String>() {//repeat是重复的次数
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});
repeatWhen的用法
Observable.just("1","2","3").repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return Observable.timer(2, TimeUnit.SECONDS);
}
}).subscribe(new Consumer<String>() {//repeat是重复的次数
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});/*输出结果为:
06-28 10:15:45.657 8879-8879/? I/hh: 1
06-28 10:15:45.657 8879-8879/? I/hh: 2
06-28 10:15:45.657 8879-8879/? I/hh: 3
2秒之后再次发射
06-28 10:15:47.656 8879-8909/jiankang.liang.com.ceshi I/hh: 1
06-28 10:15:47.657 8879-8909/jiankang.liang.com.ceshi I/hh: 2
06-28 10:15:47.657 8879-8909/jiankang.liang.com.ceshi I/hh: 3*/
range的用法
Observable.range(4,6).subscribe(new Consumer<Integer>() {//range方法的使用,输出结果为:4,5,6,7,8,9,意思就是执行6次onNext方法
@Override
public void accept(Integer integer) throws Exception {
Log.i("hh",integer+"");
}
});
interval的用法
Observable.interval(5, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {//interval是轮询,输出结果为0,1,2,3,4,5....每隔5秒就增加1,一直加下去,意思就是每隔5秒就执行一次onNext方法
@Override
public void accept(Long aLong) throws Exception {
Log.i("hh", aLong + "");
}
});
timer的用法
Observable.timer(3,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {//timer的作用就是3秒后发射一个0,然后就没了,意思就是等待3秒之后,再执行onNext方法
@Override
public void accept(Long aLong) throws Exception {
Log.i("hh",aLong+"");
}
});
filter的用法
Observable.just("1","11","12","14","21").filter(new Predicate<String>() {//filter是过滤器,过滤掉不满足条件的信息
@Override
public boolean test(@NonNull String s) throws Exception {
return s.startsWith("1");//只有返回true,才会执行onNext方法
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});
take和takeLast的用法
Observable.just("1","2","3","4","5","6").take(3).subscribe(new Consumer<String>() {//take返回前3个数据
@Override
public void accept(String s) throws Exception {
Log.i("hh","前三个:"+s);
}
});
Observable.just("1","2","3","4","5","6").takeLast(3).subscribe(new Consumer<String>() {//takeLast返回后3个数据
@Override
public void accept(String s) throws Exception {
Log.i("hh","后三个:"+s);
}
});
distinct和distinctUntilChanged的用法
Observable.just("1","2","3","4","5","4").distinct().subscribe(new Consumer<String>() {//distinct去掉发出数据的重复项,输出结果为:1,2,3,4,5
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});
Observable.just("1","2","3","4","4","5","4").distinctUntilChanged().subscribe(new Consumer<String>() {//distinctUntilChanged去掉发出数据的连续的重复项,输出结果为:1,2,3,4,5,4
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});
first和last的用法
Observable.just("1","2","3","4","4","5","4").first("5").subscribe(new Consumer<String>() {//first只会发射出第一个数据,至于那个参数,不知道有啥用,输出结果为:1
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});
Observable.just("1","2","3","4","4","5","4").last("a").subscribe(new Consumer<String>() {//first只会发射出最后一个数据,至于那个参数,不知道有啥用,输出结果为:4
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});
skip和skipLast的用法
Observable.just("1","2","3","4","4","5","4").skip(4).skipLast(2).subscribe(new Consumer<String>() {//skip()和skipLast()函数与take()和takeLast()相对应。它们用整数N作参数,从本质上来说,它们不让Observable发射前N个或者后N个值。输出结果是4,跳过了前4个数据和最后2个数据
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});
elementAt的用法
Observable.just("hello", "my", "world").elementAt(1).subscribe(new Consumer<String>() {//elementAt只发射出对应位置的数据,输出结果为:my
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});
flatmap的使用
fromIterable的用法
List<String> list = new ArrayList<>();
for (int i = 0; i < 15; i++) {
list.add("i = " + i);
}
Observable.fromIterable(list).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});
//以下写法是不正规的
/*Observable.fromArray(list).subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {//因为这里的list是Iterable的子类,所以应该用fromIterable,这样accept方法中的参数还是List<String>,不能将其转换为String
}
});*/
buffer的用法
List<String> list = new ArrayList<>();
for (int i = 0; i < 15; i++) {
list.add("i = " + i);
}
Observable.fromIterable(list).buffer(4).subscribe(new Consumer<List<String>>() {//它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个输出结果为:
/*11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 0
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 1
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 2
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 3
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: next group
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 4
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 5
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 6
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 7
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: next group
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 8
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 9
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 10
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 11
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: next group
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 12
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 13
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: i = 14
11-09 21:43:35.310 15233-15233/com.example.liang.lianxi I/hh: next group*/
@Override
public void accept(List<String> strings) throws Exception {
for (String s : strings)
{
Log.i("hh", s);
}
Log.i("hh", "\n next group");
}
});
window的用法
List<String> list = new ArrayList<>();
for (int i = 0; i < 15; i++) {
list.add("i = " + i);
}
Observable.fromIterable(list).window(4).subscribe(new Consumer<Observable<String>>() {//window()函数和buffer()很像,但是它发射的是Observable而不是列表
@Override
public void accept(Observable<String> stringObservable) throws Exception {
stringObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});
Log.i("hh", "\n next group") ;
}
});
cast的用法
List<Person> list = new ArrayList<>();
for (int i = 0; i < 30; i++) {
list.add(new Male(true, "i" + i));
}
Observable.fromIterable(list).cast(Male.class).subscribe(new Consumer<Male>() {//cast将源Observable中的每一项数据都转换为新的类型,把它变成了不同的Class
@Override
public void accept(Male male) throws Exception {
Log.i("hh",male.name+","+male.hasJJ);
}
});
}
class Person {
String name;
public Person(String name) {
this.name = name;
}
}
class Male extends Person {
public boolean hasJJ;
public Male(boolean hasJJ, String name) {
super(name);
this.hasJJ = hasJJ;
}
}
groupBy的用法
Observable.just(2,3,5,6)
.groupBy(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) throws Exception {
return integer%2==0?"偶数":"奇数";
}
}).subscribe(new Consumer<GroupedObservable<Object, Integer>>() {
@Override
public void accept(final GroupedObservable<Object, Integer> objectIntegerGroupedObservable) throws Exception {
objectIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", objectIntegerGroupedObservable.getKey() + "," + integer);
/* 输出结果为:
06-28 00:21:32.655 12869-12869/jiankang.liang.com.ceshi D/MainActivity: 偶数,2
06-28 00:21:32.655 12869-12869/jiankang.liang.com.ceshi D/MainActivity: 奇数,3
06-28 00:21:32.655 12869-12869/jiankang.liang.com.ceshi D/MainActivity: 奇数,5
06-28 00:21:32.655 12869-12869/jiankang.liang.com.ceshi D/MainActivity: 偶数,6*/
}
});
}
});
groupBy操作符是对源Observable产生的结果进行分组,形成一个类型为GroupedObservable的结果集,GroupedObservable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值(类似于HashMap的key)。
值得注意的是,由于结果集中的GroupedObservable是把分组结果缓存起来,如果对每一个GroupedObservable不进行处理(既不订阅执行也不对其进行别的操作符运算),就有可能出现内存泄露。因此,如果你对某个GroupedObservable不进行处理,最好是对其使用操作符take(0)处理。
scan的使用
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(i+"");
}
Observable.fromIterable(list).scan(new BiFunction<String, String, String>() {//scan操作符对一个序列的数据应用一个函数,并将这个函数的结果发射出去作为下个数据应用这个函数时候的第一个参数使用,有点类似于递归操作
@Override
public String apply(String s, String s2) throws Exception {
return Integer.parseInt(s)+Integer.parseInt(s2)+"";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});
*//* 输出结果为:
11-10 19:52:16.974 775-775/com.example.liang.lianxi I/hh: 0
11-10 19:52:16.974 775-775/com.example.liang.lianxi I/hh: 1
11-10 19:52:16.974 775-775/com.example.liang.lianxi I/hh: 3
11-10 19:52:16.974 775-775/com.example.liang.lianxi I/hh: 6
11-10 19:52:16.974 775-775/com.example.liang.lianxi I/hh: 10
11-10 19:52:16.974 775-775/com.example.liang.lianxi I/hh: 15
11-10 19:52:16.984 775-775/com.example.liang.lianxi I/hh: 21
11-10 19:52:16.984 775-775/com.example.liang.lianxi I/hh: 28
11-10 19:52:16.984 775-775/com.example.liang.lianxi I/hh: 36
11-10 19:52:16.984 775-775/com.example.liang.lianxi I/hh: 45
merge的使用
List<String> list1 = new ArrayList<>();
for (int i = 0; i < 8; i++) {
list1.add("hello i:" + i);
}
List<String> list = new ArrayList<>();
for (int j = 0; j < 8; j++) {
list.add("world j:" + j);
}
List<String> list2 = new ArrayList<>();
for (int m = 0; m < 8; m++) {
list2.add("fuck m:" + m);
}
ObservableSource<String> observableSource=Observable.fromIterable(list);
ObservableSource<String> observableSource1=Observable.fromIterable(list1);
ObservableSource<String> observableSource2=Observable.fromIterable(list2);
Observable.merge(observableSource,observableSource1,observableSource2).subscribe(new Consumer<String>() {//merge()方法将帮助你把两个甚至更多的Observables合并到发射的数据项里,一起发射
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});
*//* 输出结果为:
11-10 20:11:44.194 28253-28253/com.example.liang.lianxi I/hh: world j:0
11-10 20:11:44.194 28253-28253/com.example.liang.lianxi I/hh: world j:1
11-10 20:11:44.194 28253-28253/com.example.liang.lianxi I/hh: world j:2
11-10 20:11:44.194 28253-28253/com.example.liang.lianxi I/hh: world j:3
11-10 20:11:44.194 28253-28253/com.example.liang.lianxi I/hh: world j:4
11-10 20:11:44.194 28253-28253/com.example.liang.lianxi I/hh: world j:5
11-10 20:11:44.194 28253-28253/com.example.liang.lianxi I/hh: world j:6
11-10 20:11:44.194 28253-28253/com.example.liang.lianxi I/hh: world j:7
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: hello i:0
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: hello i:1
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: hello i:2
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: hello i:3
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: hello i:4
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: hello i:5
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: hello i:6
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: hello i:7
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: fuck m:0
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: fuck m:1
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: fuck m:2
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: fuck m:3
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: fuck m:4
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: fuck m:5
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: fuck m:6
11-10 20:11:44.204 28253-28253/com.example.liang.lianxi I/hh: fuck m:7
每个Observable抛出的错误都将会打断合并。如果你需要避免这种情况,RxJava提供了mergeDelayError(),它能从一个Observable中继续发射数据即便是其中有一个抛出了错误。当所有的Observables都完成时,mergeDelayError()将会发射onError()
zip的用法(可以看看博客://www.greatytc.com/p/bb58571cdb64,讲的很好)
List<String> hellos = new ArrayList<>();
for (int i = 0; i < 10; i++) {
hellos.add("hello i:" + i);
}
List<Integer> worlds = new ArrayList<>();
for (int j = 0; j < 10; j++) {
worlds.add(j);
}
ObservableSource<String> observableSource=Observable.fromIterable(hellos);
ObservableSource<Integer> observableSource1=Observable.fromIterable(worlds);
Observable.zip(observableSource, observableSource1, new BiFunction<String, Integer, Object>() {//zip() 合并两个或者多个Observables发射出的数据项,根据指定的函数Func* 变换它们,并发射一个新值
@Override
public Object apply(String s, Integer integer) throws Exception {
return s+":"+integer;
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.i("hh", o.toString());
}
});
/*输出结果为:
06-27 12:44:24.068 6712-6712/? I/hh: hello i:0:0
06-27 12:44:24.068 6712-6712/? I/hh: hello i:1:1
06-27 12:44:24.068 6712-6712/? I/hh: hello i:2:2
06-27 12:44:24.068 6712-6712/? I/hh: hello i:3:3
06-27 12:44:24.068 6712-6712/? I/hh: hello i:4:4
06-27 12:44:24.068 6712-6712/? I/hh: hello i:5:5
06-27 12:44:24.068 6712-6712/? I/hh: hello i:6:6
06-27 12:44:24.068 6712-6712/? I/hh: hello i:7:7
06-27 12:44:24.068 6712-6712/? I/hh: hello i:8:8
06-27 12:44:24.068 6712-6712/? I/hh: hello i:9:9*/
combineLatest的用法
List<String> hellos = new ArrayList<>();
for (int i = 0; i < 10; i++) {
hellos.add("hello i:" + i);
}
List<Integer> worlds = new ArrayList<>();
for (int j = 0; j < 10; j++) {
worlds.add(j);
}
ObservableSource<String> observableSource=Observable.fromIterable(hellos);
ObservableSource<Integer> observableSource1=Observable.fromIterable(worlds);
Observable.combineLatest(observableSource, observableSource1, new BiFunction<String, Integer, Object>() {
@Override
public Object apply(String s, Integer integer) throws Exception {
return s+","+integer;
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.i("hh",o.toString());
}
});
/*
输出结果为:
06-27 11:12:13.655 7368-7368/? I/hh: hello i:9,0
06-27 11:12:13.655 7368-7368/? I/hh: hello i:9,1
06-27 11:12:13.655 7368-7368/? I/hh: hello i:9,2
06-27 11:12:13.655 7368-7368/? I/hh: hello i:9,3
06-27 11:12:13.655 7368-7368/? I/hh: hello i:9,4
06-27 11:12:13.655 7368-7368/? I/hh: hello i:9,5
06-27 11:12:13.655 7368-7368/? I/hh: hello i:9,6
06-27 11:12:13.655 7368-7368/? I/hh: hello i:9,7
06-27 11:12:13.655 7368-7368/? I/hh: hello i:9,8
06-27 11:12:13.656 7368-7368/? I/hh: hello i:9,9
*/
combineLatest(结合最新的)的用法再来一例
Observable.combineLatest(Observable.just("1", "2", "3", "6"), Observable.just("11", "21", "31", "51"), new BiFunction<String, String, Object>() {//如果是三个数据源的话,那么前两个数据源只取最后一个,最后一个数据源取所有数据
@Override
public Object apply(String s, String s2) throws Exception {
return s+s2;
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d("MainActivity", o.toString());
}
});
/* 输出结果为:
06-27 10:34:50.615 4192-4192/jiankang.liang.com.ceshi D/MainActivity: 611
06-27 10:34:50.615 4192-4192/jiankang.liang.com.ceshi D/MainActivity: 621
06-27 10:34:50.615 4192-4192/jiankang.liang.com.ceshi D/MainActivity: 631
06-27 10:34:50.615 4192-4192/jiankang.liang.com.ceshi D/MainActivity: 651*/
delay的用法
ArrayList<String> list=new ArrayList<>();
for(int i=0;i<10;i++){
list.add("我是"+i);
}
Observable.fromIterable(list).delay(2,TimeUnit.SECONDS)//delay方法是延迟一定时间后发射数据
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("SecondActivityaaa", s);
}
});
/* 输出结果为:
03-23 16:51:26.499 835-835/com.example.liang.lianxi D/SecondActivityaaa: 我是0
03-23 16:51:26.499 835-835/com.example.liang.lianxi D/SecondActivityaaa: 我是1
03-23 16:51:26.499 835-835/com.example.liang.lianxi D/SecondActivityaaa: 我是2
03-23 16:51:26.499 835-835/com.example.liang.lianxi D/SecondActivityaaa: 我是3
03-23 16:51:26.499 835-835/com.example.liang.lianxi D/SecondActivityaaa: 我是4
03-23 16:51:26.499 835-835/com.example.liang.lianxi D/SecondActivityaaa: 我是5
03-23 16:51:26.499 835-835/com.example.liang.lianxi D/SecondActivityaaa: 我是6
03-23 16:51:26.499 835-835/com.example.liang.lianxi D/SecondActivityaaa: 我是7
03-23 16:51:26.499 835-835/com.example.liang.lianxi D/SecondActivityaaa: 我是8
03-23 16:51:26.499 835-835/com.example.liang.lianxi D/SecondActivityaaa: 我是9*/
sample的用法
ArrayList<String> list=new ArrayList<>();
for(int i=0;i<1000000;i++){
list.add("我是"+i);
}
Observable.fromIterable(list).sample(200, TimeUnit.MICROSECONDS)//sample方法是每过一定时间从发射的数据中获取一个数据发送到下游
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("SecondActivityaaa", s);
}
});
/* 输出结果为:
03-23 16:41:43.019 22656-22656/com.example.liang.lianxi D/SecondActivityaaa: 我是926478
03-23 16:41:43.019 22656-22656/com.example.liang.lianxi D/SecondActivityaaa: 我是926986
03-23 16:41:43.019 22656-22656/com.example.liang.lianxi D/SecondActivityaaa: 我是927613
03-23 16:41:43.019 22656-22656/com.example.liang.lianxi D/SecondActivityaaa: 我是928096
03-23 16:41:43.019 22656-22656/com.example.liang.lianxi D/SecondActivityaaa: 我是928588
03-23 16:41:43.019 22656-22656/com.example.liang.lianxi D/SecondActivityaaa: 我是929134
03-23 16:41:43.019 22656-22656/com.example.liang.lianxi D/SecondActivityaaa: 我是929634
03-23 16:41:43.019 22656-22656/com.example.liang.lianxi D/SecondActivityaaa: 我是929940
...*/
concatMap的用法,它与flatMap的区别就是它是有顺序发射数据,而flatMap是无顺序的发射数据
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for(int i=0;i<10;i++){
emitter.onNext(i);
}
}
}).concatMap(new Function<Integer, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d("SecondActivity", o.toString());
}
});
/* 使用flatMap的输出结果为:
03-23 17:11:26.739 30445-30475/com.example.liang.lianxi D/SecondActivity: I am value 0
03-23 17:11:26.739 30445-30475/com.example.liang.lianxi D/SecondActivity: I am value 0
03-23 17:11:26.739 30445-30475/com.example.liang.lianxi D/SecondActivity: I am value 0
03-23 17:11:26.739 30445-30477/com.example.liang.lianxi D/SecondActivity: I am value 2
03-23 17:11:26.739 30445-30477/com.example.liang.lianxi D/SecondActivity: I am value 2
03-23 17:11:26.739 30445-30477/com.example.liang.lianxi D/SecondActivity: I am value 2
03-23 17:11:26.739 30445-30476/com.example.liang.lianxi D/SecondActivity: I am value 1
03-23 17:11:26.739 30445-30476/com.example.liang.lianxi D/SecondActivity: I am value 1
03-23 17:11:26.739 30445-30476/com.example.liang.lianxi D/SecondActivity: I am value 1
03-23 17:11:26.739 30445-30478/com.example.liang.lianxi D/SecondActivity: I am value 3
03-23 17:11:26.739 30445-30478/com.example.liang.lianxi D/SecondActivity: I am value 3
03-23 17:11:26.739 30445-30478/com.example.liang.lianxi D/SecondActivity: I am value 3
03-23 17:11:26.739 30445-30479/com.example.liang.lianxi D/SecondActivity: I am value 4
03-23 17:11:26.739 30445-30479/com.example.liang.lianxi D/SecondActivity: I am value 4
03-23 17:11:26.739 30445-30479/com.example.liang.lianxi D/SecondActivity: I am value 4
03-23 17:11:26.739 30445-30480/com.example.liang.lianxi D/SecondActivity: I am value 5
03-23 17:11:26.739 30445-30480/com.example.liang.lianxi D/SecondActivity: I am value 5
03-23 17:11:26.739 30445-30480/com.example.liang.lianxi D/SecondActivity: I am value 5
03-23 17:11:26.739 30445-30481/com.example.liang.lianxi D/SecondActivity: I am value 6
03-23 17:11:26.739 30445-30481/com.example.liang.lianxi D/SecondActivity: I am value 6
03-23 17:11:26.739 30445-30481/com.example.liang.lianxi D/SecondActivity: I am value 6
03-23 17:11:26.739 30445-30482/com.example.liang.lianxi D/SecondActivity: I am value 7
03-23 17:11:26.739 30445-30476/com.example.liang.lianxi D/SecondActivity: I am value 9
03-23 17:11:26.739 30445-30476/com.example.liang.lianxi D/SecondActivity: I am value 9
03-23 17:11:26.739 30445-30476/com.example.liang.lianxi D/SecondActivity: I am value 9
03-23 17:11:26.739 30445-30475/com.example.liang.lianxi D/SecondActivity: I am value 8
03-23 17:11:26.739 30445-30475/com.example.liang.lianxi D/SecondActivity: I am value 8
03-23 17:11:26.739 30445-30475/com.example.liang.lianxi D/SecondActivity: I am value 8
03-23 17:11:26.739 30445-30482/com.example.liang.lianxi D/SecondActivity: I am value 7
03-23 17:11:26.739 30445-30482/com.example.liang.lianxi D/SecondActivity: I am value 7*/
/*使用concatMap的输出结果为:
03-23 17:13:41.049 1841-1894/com.example.liang.lianxi D/SecondActivity: I am value 0
03-23 17:13:41.049 1841-1894/com.example.liang.lianxi D/SecondActivity: I am value 0
03-23 17:13:41.049 1841-1894/com.example.liang.lianxi D/SecondActivity: I am value 0
03-23 17:13:41.059 1841-1897/com.example.liang.lianxi D/SecondActivity: I am value 1
03-23 17:13:41.059 1841-1897/com.example.liang.lianxi D/SecondActivity: I am value 1
03-23 17:13:41.059 1841-1897/com.example.liang.lianxi D/SecondActivity: I am value 1
03-23 17:13:41.069 1841-1910/com.example.liang.lianxi D/SecondActivity: I am value 2
03-23 17:13:41.079 1841-1910/com.example.liang.lianxi D/SecondActivity: I am value 2
03-23 17:13:41.079 1841-1910/com.example.liang.lianxi D/SecondActivity: I am value 2
03-23 17:13:41.089 1841-1915/com.example.liang.lianxi D/SecondActivity: I am value 3
03-23 17:13:41.089 1841-1915/com.example.liang.lianxi D/SecondActivity: I am value 3
03-23 17:13:41.089 1841-1915/com.example.liang.lianxi D/SecondActivity: I am value 3
03-23 17:13:41.099 1841-1916/com.example.liang.lianxi D/SecondActivity: I am value 4
03-23 17:13:41.099 1841-1916/com.example.liang.lianxi D/SecondActivity: I am value 4
03-23 17:13:41.099 1841-1916/com.example.liang.lianxi D/SecondActivity: I am value 4
03-23 17:13:41.109 1841-1918/com.example.liang.lianxi D/SecondActivity: I am value 5
03-23 17:13:41.109 1841-1918/com.example.liang.lianxi D/SecondActivity: I am value 5
03-23 17:13:41.109 1841-1918/com.example.liang.lianxi D/SecondActivity: I am value 5
03-23 17:13:41.119 1841-1919/com.example.liang.lianxi D/SecondActivity: I am value 6
03-23 17:13:41.119 1841-1919/com.example.liang.lianxi D/SecondActivity: I am value 6
03-23 17:13:41.119 1841-1919/com.example.liang.lianxi D/SecondActivity: I am value 6
03-23 17:13:41.129 1841-1920/com.example.liang.lianxi D/SecondActivity: I am value 7
03-23 17:13:41.129 1841-1920/com.example.liang.lianxi D/SecondActivity: I am value 7
03-23 17:13:41.129 1841-1920/com.example.liang.lianxi D/SecondActivity: I am value 7
03-23 17:13:41.139 1841-1894/com.example.liang.lianxi D/SecondActivity: I am value 8
03-23 17:13:41.149 1841-1894/com.example.liang.lianxi D/SecondActivity: I am value 8
03-23 17:13:41.149 1841-1894/com.example.liang.lianxi D/SecondActivity: I am value 8
03-23 17:13:41.159 1841-1897/com.example.liang.lianxi D/SecondActivity: I am value 9
03-23 17:13:41.159 1841-1897/com.example.liang.lianxi D/SecondActivity: I am value 9
03-23 17:13:41.159 1841-1897/com.example.liang.lianxi D/SecondActivity: I am value 9*/
sequenceEqual的用法(比较两个数据源发射出的数据是否一样)
List<String> hellos = new ArrayList<>();
for (int i = 0; i < 10; i++) {
hellos.add("hello i:" + i);
}
List<String> worlds = new ArrayList<>();
for (int j = 0; j < 10; j++) {
worlds.add(j+"");
}
ObservableSource<String> observableSource=Observable.fromIterable(hellos);
ObservableSource<String> observableSource1=Observable.fromIterable(worlds);
Observable.sequenceEqual(observableSource, observableSource1, new BiPredicate<String, String>() {
@Override
public boolean test(String string, String string2) throws Exception {
Log.d("MainActivity", "string"+string);
Log.d("MainActivity", "string2"+string2);
return false;
}
}).subscribe();
/*输出结果为:
06-27 13:02:00.687 24793-24793/jiankang.liang.com.ceshi D/MainActivity: stringhello i:0
06-27 13:02:00.687 24793-24793/jiankang.liang.com.ceshi D/MainActivity: string20*/
startWith的用法,在数据源开头连接上一项数据
Observable.just("1","2","3").startWith("我是开头").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("MainActivity", s);
}
});
/* 输出结果为:
06-27 14:15:18.278 14755-14755/jiankang.liang.com.ceshi D/MainActivity: 我是开头
06-27 14:15:18.278 14755-14755/jiankang.liang.com.ceshi D/MainActivity: 1
06-27 14:15:18.278 14755-14755/jiankang.liang.com.ceshi D/MainActivity: 2
06-27 14:15:18.278 14755-14755/jiankang.liang.com.ceshi D/MainActivity: 3*/
ofType的用法,也是用来过滤数据的
Observable.just("1","11",12,"14",21).ofType(Integer.class).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("hh",integer.toString());
}
});
/*输出结果为:
06-27 14:23:58.905 22412-22412/jiankang.liang.com.ceshi I/hh: 12
06-27 14:23:58.905 22412-22412/jiankang.liang.com.ceshi I/hh: 21*/
用rxjava实现的倒计时,代码如下:
private void initCountDown() {
Observable.interval(1, TimeUnit.SECONDS)//interval是间隔的意思
.take(3)//计时次数
.map(new Function<Long, Long>() {
@Override
public Long apply(Long aLong) throws Exception {
return mTime - aLong;// 3-0 3-1 3-2
}
})
.compose(new ObservableTransformer<Long, Long>() {//compose是调解的意思
@Override
public ObservableSource<Long> apply(Observable<Long> upstream) {
return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());//发布事件io线程,接收事件主线程
}
})
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long value) {
// Logger.e("value = " + value);
String s = String.valueOf(value);
if (tvCountDown != null)
tvCountDown.setText(StringUtils.isEmpty(s) ? "" : s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
if (!mIsCancle) {
startActivity(new Intent(FlashActivity.this, MainActivity.class));
finish();
}
}
});
}
throttleFirst的用法(定期发射Observable中的第一项数据)
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> ee) throws Exception {
ee.onNext(1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
ee.onNext(2);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
ee.onNext(3);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
ee.onNext(4);
ee.onNext(5);
}
}).throttleFirst(1100, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d("MainActivity", o.toString());
}
}); //结果为1,4
throttleLast的用法,sample的作用和throttleLast一样(定期发射Observable中的最后一项数据)
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> ee) throws Exception {
ee.onNext(1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
ee.onNext(2);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
ee.onNext(3);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
ee.onNext(4);
ee.onNext(5);
}
}).throttleLast(1100, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d("MainActivity", o.toString());
}
}); //结果为3,5
throttleWithTimeout/debounce的用法(发射数据时,如果两次数据的发射间隔小于指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才进行发射)
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> ee) throws Exception {
ee.onNext(1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
ee.onNext(2);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
ee.onNext(3);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
ee.onNext(4);
ee.onNext(5);
}
}).throttleWithTimeout(600, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d("MainActivity", o.toString());
}
}); //结果为3,5
timeout的用法(如果原始Observable过了指定的一段时长没有发射任何数据,就发射一个异常或者使用备用的Observable)
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> ee) throws Exception {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
ee.onNext(222);
}
}).timeout(1100, TimeUnit.MILLISECONDS,Observable.just(333))
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d("MainActivity", o.toString());
}
}); //结果为:06-27 18:06:50.411 15057-15080/jiankang.liang.com.ceshi D/MainActivity: 333
all的用法(判断所有的数据项是否满足某个条件)
Observable.just("1", "12", "13", "16").all(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
if (s.contains("1"))
return true;
else
return false;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d("MainActivity", aBoolean.toString());
}
});//输出结果为:06-27 18:17:29.283 26104-26104/jiankang.liang.com.ceshi D/MainActivity: true
contains的用法(判断在发射的所有数据项中是否包含指定的数据)
Observable.just("1", "12", "13", "16").contains("12")
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d("MainActivity", aBoolean.toString());
}
});//输出结果为:06-27 18:17:29.283 26104-26104/jiankang.liang.com.ceshi D/MainActivity: true
switchIfEmpty的用法(如果原始Observable正常终止后仍然没有发射任何数据,就使用备用的Observable。)
Observable.empty()
.switchIfEmpty(Observable.just(2, 3, 4)).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d("MainActivity", o.toString());
}
});//输出结果为:
/* 06-27 19:48:58.535 31784-31784/jiankang.liang.com.ceshi D/MainActivity: 2
06-27 19:48:58.535 31784-31784/jiankang.liang.com.ceshi D/MainActivity: 3
06-27 19:48:58.535 31784-31784/jiankang.liang.com.ceshi D/MainActivity: 4*/
defaultIfEmpty的用法(如果原始Observable正常终止后仍然没有发射任何数据,就发射一个默认值,内部调用的switchIfEmpty。)
Observable.empty()
.defaultIfEmpty("呵呵").subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d("MainActivity", o.toString());
}
});//输出结果为:06-27 19:55:28.745 6883-6883/? D/MainActivity: 呵呵
takeUntil的用法(当发射的数据满足某个条件后(包含该数据),终止Observable发送数据。)
Observable.just(1,2,13,4,56,7,88,99)
.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer>10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", integer.toString());
}
});/*输出结果为:06-27 20:02:16.798 9211-9211/? D/MainActivity: 1
06-27 20:02:16.798 9211-9211/? D/MainActivity: 2
06-27 20:02:16.798 9211-9211/? D/MainActivity: 13*/
skipWhile的用法( 丢弃Observable发射的数据,直到一个指定的条件不成立,然后剩下的所有数据都不丢弃了(包括条件数据也不丢弃))
Observable.just(22,33,44,55)
.skipWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer<40;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", integer.toString());
}
});/*输出结果为:06-27 20:02:16.798 9211-9211/? D/MainActivity: 44
06-27 20:02:16.798 9211-9211/? D/MainActivity: 45*/
reduce的用法(对序列中的数据进行总的处理并发射最终的结果,比如求和之类的)
Observable.just(22,33,44,55)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", integer.toString());
}
});//输出结果为:06-27 20:49:21.398 29600-29600/? D/MainActivity: 154
collect的用法(收集数据到一个可变的数据结构)
Observable.just(22,33,44,55)
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> arrayList, Integer integer) throws Exception {
arrayList.add(integer);
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> integers) throws Exception {
for (int i = 0; i < integers.size(); i++) {
Log.d("MainActivity", "integers.get(i):" + integers.get(i));
}
}
});/*输出结果为:06-27 21:04:22.732 9116-9116/jiankang.liang.com.ceshi D/MainActivity: integers.get(i):22
06-27 21:04:22.732 9116-9116/jiankang.liang.com.ceshi D/MainActivity: integers.get(i):33
06-27 21:04:22.732 9116-9116/jiankang.liang.com.ceshi D/MainActivity: integers.get(i):44
06-27 21:04:22.732 9116-9116/jiankang.liang.com.ceshi D/MainActivity: integers.get(i):55*/
count的用法(统计发射数据的数量)
Observable.just(22,33,44,55)
.count().subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d("MainActivity", "aLong:" + aLong);
}
});/*06-27 21:08:57.448 14105-14105/jiankang.liang.com.ceshi D/MainActivity: aLong:4*/
toList的用法(收集原始Observable发射的所有数据到一个列表,然后返回这个列表)
Observable.just(22,33,44,55)
.toList().subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
for (int i = 0; i < integers.size(); i++) {
Log.d("MainActivity", integers.get(i).toString());
}
}
});/*06-27 21:12:24.369 18057-18057/jiankang.liang.com.ceshi D/MainActivity: 22
06-27 21:12:24.369 18057-18057/jiankang.liang.com.ceshi D/MainActivity: 33
06-27 21:12:24.369 18057-18057/jiankang.liang.com.ceshi D/MainActivity: 44
06-27 21:12:24.369 18057-18057/jiankang.liang.com.ceshi D/MainActivity: 55*/
toSortedList的用法(收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表。)
Observable.just(122,323,44,55)
.toSortedList(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1-o2; //>0 升序 ,<0 降序
}
}).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
for (int i = 0; i < integers.size(); i++) {
Log.d("MainActivity", integers.get(i).toString());
}
}
});/*06-27 21:17:20.137 23474-23474/jiankang.liang.com.ceshi D/MainActivity: 44
06-27 21:17:20.137 23474-23474/jiankang.liang.com.ceshi D/MainActivity: 55
06-27 21:17:20.137 23474-23474/jiankang.liang.com.ceshi D/MainActivity: 122
06-27 21:17:20.137 23474-23474/jiankang.liang.com.ceshi D/MainActivity: 323*/
toMap的用法(将序列数据转换为一个Map。我们可以根据数据项生成key和生成value。)
Observable.just(122,323,44,55)
.toMap(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) throws Exception {
return "key"+integer;
}
}, new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) throws Exception {
return "value"+integer;
}
}).subscribe(new Consumer<Map<Object, Object>>() {
@Override
public void accept(Map<Object, Object> objectObjectMap) throws Exception {
Log.d("MainActivity", objectObjectMap.toString());
}
});/*06-27 23:34:18.127 28619-28619/jiankang.liang.com.ceshi D/MainActivity: {key55=value55, key122=value122, key323=value323, key44=value44}*/
onErrorReturn的用法(当原始Observable在遇到错误时发射一个特定的数据)
Observable.just("1",2,"4").cast(Integer.class)
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return 250;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", "integer:" + integer);
}
});//输出结果为:06-28 09:31:15.713 1172-1172/jiankang.liang.com.ceshi D/MainActivity: integer:250
onErrorResumeNext的用法(当原始Observable在遇到错误时,使用备用Observable)
Observable.just("1",2,"4").cast(Integer.class)
.onErrorResumeNext(Observable.just(222))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", "integer:" + integer);
}
});//输出结果为:06-28 09:36:22.099 8941-8941/? D/MainActivity: integer:222
onExceptionResumeNext的用法(当原始Observable在遇到异常时,使用备用的Observable。与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误,onExceptionResumeNext只能处理异常)
Observable.just("1",2,"4").cast(Integer.class)
.onExceptionResumeNext(Observable.just(222))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", "integer:" + integer);
}
});//输出结果为:06-28 09:36:22.099 8941-8941/? D/MainActivity: integer:222
retry的用法(当原始Observable在遇到错误时进行重试)
Observable.just(1, 2, "4").cast(Integer.class)
.retry(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", "integer:" + integer);
}
});/*输出结果为:06-28 09:48:11.273 19007-19007/? D/MainActivity: integer:1
06-28 09:48:11.274 19007-19007/? D/MainActivity: integer:2
06-28 09:48:11.274 19007-19007/? D/MainActivity: integer:1
06-28 09:48:11.274 19007-19007/? D/MainActivity: integer:2
06-28 09:48:11.274 19007-19007/? D/MainActivity: integer:1
06-28 09:48:11.274 19007-19007/? D/MainActivity: integer:2
06-28 09:48:11.274 19007-19007/? D/MainActivity: integer:1
06-28 09:48:11.274 19007-19007/? D/MainActivity: integer:2
然后报个异常退出应用*/
retryWhen的用法(当原始Observable在遇到错误,将错误传递给另一个Observable来决定是否要重新订阅这个Observable)
Observable.just(1, 2, "4").cast(Integer.class)
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return Observable.timer(3, TimeUnit.SECONDS);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", "integer:" + integer);
}
});/*输出结果为:06-28 10:01:38.530 27017-27017/? D/MainActivity: integer:1
06-28 10:01:38.530 27017-27017/? D/MainActivity: integer:2
当3秒后重试
06-28 10:01:41.529 27017-27040/jiankang.liang.com.ceshi D/MainActivity: integer:1
06-28 10:01:41.529 27017-27040/jiankang.liang.com.ceshi D/MainActivity: integer:2*/
publish的用法(该方法返回一个ConnectableObservable对象,该对象只有在它的connect()被调用时才开始发射数据)
final ConnectableObservable<String> connectableObservable=Observable.just("1","2","3").publish();
connectableObservable.subscribe(new Consumer<String>() {//repeat是重复的次数
@Override
public void accept(String s) throws Exception {
Log.i("hh",s);
}
});
button.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
connectableObservable.connect();
}
});
materialize的用法(将Observable转换成一个通知列表。)
Observable.just("1","2","3").materialize()
.subscribe(new Consumer<Notification<String>>() {
@Override
public void accept(Notification<String> stringNotification) throws Exception {
Log.d("MainActivity", stringNotification.toString());
}
});/*输出结果为:
06-28 10:53:24.852 10895-10895/jiankang.liang.com.ceshi D/MainActivity: OnNextNotification[1]
06-28 10:53:24.852 10895-10895/jiankang.liang.com.ceshi D/MainActivity: OnNextNotification[2]
06-28 10:53:24.852 10895-10895/jiankang.liang.com.ceshi D/MainActivity: OnNextNotification[3]
06-28 10:53:24.852 10895-10895/jiankang.liang.com.ceshi D/MainActivity: OnCompleteNotification*/
timestamp的用法(给Observable发射的每个数据项添加一个时间戳,就是记录什么时间发射的数据)
Observable.just("1","2","3").timestamp()
.subscribe(new Consumer<Timed<String>>() {
@Override
public void accept(Timed<String> stringTimed) throws Exception {
Log.d("MainActivity", stringTimed.toString());
}
});/*输出结果为:
06-28 11:00:10.778 17172-17172/? D/MainActivity: Timed[time=1530154810778, unit=MILLISECONDS, value=1]
06-28 11:00:10.778 17172-17172/? D/MainActivity: Timed[time=1530154810778, unit=MILLISECONDS, value=2]
06-28 11:00:10.778 17172-17172/? D/MainActivity: Timed[time=1530154810778, unit=MILLISECONDS, value=3]*/
timeInterval的用法(给Observable发射的两个数据项间添加一个时间差)
Observable.just("1","2","3").timeInterval()
.subscribe(new Consumer<Timed<String>>() {
@Override
public void accept(Timed<String> stringTimed) throws Exception {
Log.d("MainActivity", stringTimed.toString());
}
});/*输出结果为:
06-28 11:05:36.290 20645-20645/jiankang.liang.com.ceshi D/MainActivity: Timed[time=0, unit=MILLISECONDS, value=1]
06-28 11:05:36.290 20645-20645/jiankang.liang.com.ceshi D/MainActivity: Timed[time=0, unit=MILLISECONDS, value=2]
06-28 11:05:36.290 20645-20645/jiankang.liang.com.ceshi D/MainActivity: Timed[time=0, unit=MILLISECONDS, value=3]*/
doOnEach的用法(注册一个动作,对Observable发射的每个数据项使用)
Observable.just("1","2","3").doOnEach(new Consumer<Notification<String>>() {
@Override
public void accept(Notification<String> stringNotification) throws Exception {
Log.d("MainActivity", stringNotification.toString());
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("MainActivity", s);
}
});/*输出结果为:
06-28 11:14:17.030 29459-29459/? D/MainActivity: OnNextNotification[1]
06-28 11:14:17.030 29459-29459/? D/MainActivity: 1
06-28 11:14:17.030 29459-29459/? D/MainActivity: OnNextNotification[2]
06-28 11:14:17.030 29459-29459/? D/MainActivity: 2
06-28 11:14:17.030 29459-29459/? D/MainActivity: OnNextNotification[3]
06-28 11:14:17.030 29459-29459/? D/MainActivity: 3
06-28 11:14:17.030 29459-29459/? D/MainActivity: OnCompleteNotification*/
doOnComplete的用法(注册一个动作,对正常完成的Observable使用)
Observable.just("1","2","3").doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d("MainActivity", "执行结束");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("MainActivity", s);
}
});/*输出结果为:
06-28 11:16:59.117 32292-32292/? D/MainActivity: 1
06-28 11:16:59.118 32292-32292/? D/MainActivity: 2
06-28 11:16:59.118 32292-32292/? D/MainActivity: 3
06-28 11:16:59.118 32292-32292/? D/MainActivity: 执行结束*/
doOnError的用法(注册一个动作,对发生错误的Observable使用)
Observable.just(1,"2","3").cast(Integer.class).doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d("MainActivity", "出错了");
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", "integer:" + integer);
}
});/*输出结果为:
06-28 11:20:20.167 4182-4182/jiankang.liang.com.ceshi D/MainActivity: integer:1
06-28 11:20:20.167 4182-4182/? D/MainActivity: 出错了
然后报出异常,退出应用*/
doOnTerminate的用法(注册一个动作,对完成的Observable使用,无论是否发生错误)
Observable.just(1,"2","3").cast(Integer.class).doOnTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d("MainActivity", "结束了");
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", "integer:" + integer);
}
});/*输出结果为:
06-28 12:55:28.421 23181-23181/? D/MainActivity: integer:1
06-28 12:55:28.422 23181-23181/? D/MainActivity: 结束了
然后报出异常,退出应用*/
doOnSubscribe的用法(注册一个动作,在观察者订阅时使用)
Observable.just(1,2,3).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d("MainActivity", "订阅了");
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", "integer:" + integer);
}
});/*输出结果为:
06-28 12:58:39.366 25400-25400/jiankang.liang.com.ceshi D/MainActivity: 订阅了
06-28 12:58:39.366 25400-25400/jiankang.liang.com.ceshi D/MainActivity: integer:1
06-28 12:58:39.366 25400-25400/jiankang.liang.com.ceshi D/MainActivity: integer:2
06-28 12:58:39.366 25400-25400/jiankang.liang.com.ceshi D/MainActivity: integer:3*/
doAfterTerminate的用法(注册一个动作,在Observable完成时使用)
Observable.just(1,2,3).doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d("MainActivity", "结束之后做的事");
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", "integer:" + integer);
}
});/*输出结果为:
06-28 13:04:23.276 30426-30426/jiankang.liang.com.ceshi D/MainActivity: integer:1
06-28 13:04:23.276 30426-30426/jiankang.liang.com.ceshi D/MainActivity: integer:2
06-28 13:04:23.276 30426-30426/jiankang.liang.com.ceshi D/MainActivity: integer:3
06-28 13:04:23.276 30426-30426/jiankang.liang.com.ceshi D/MainActivity: 结束之后做的事*/
delaySubscription的用法(延时处理订阅请求)
Observable.just(1,2,3).delaySubscription(2, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("MainActivity", "integer:" + integer);
}
});/*输出结果为:(2秒之后才发射数据)
06-28 13:12:47.835 7976-8006/jiankang.liang.com.ceshi D/MainActivity: integer:1
06-28 13:12:47.835 7976-8006/jiankang.liang.com.ceshi D/MainActivity: integer:2
06-28 13:12:47.835 7976-8006/jiankang.liang.com.ceshi D/MainActivity: integer:3*/
参考文章如下:
【Android】RxJava的使用(四)线程控制 —— Scheduler
Rxjava实战
你真的会用RxJava么?RxJava线程变换之observeOn与subscribeOn
https://blog.csdn.net/maplejaw_/article/details/52396175
———————————————————————
Flowable(具有背压功能)
Flowable是为了解决背压(backpressure)问题,而在Observable的基础上优化后的产物,与Observable不是同一组观察者模式下的成员,Flowable是Publisher与Subscriber这一组观察者模式中Publisher的典型实现,Observable是ObservableSource/Observer这一组观察者模式中ObservableSource的典型实现;所以在使用Flowable的时候,可观察对象不再是Observable,而是Flowable;观察者不再是Observer,而是Subscriber。Flowable与Subscriber之间依然通过subscribe()进行关联。有些朋友可能会想,既然Flowable是在Observable的基础上优化后的产物,Observable能解决的问题Flowable都能进行解决,何不抛弃Observable而只用Flowable呢。其实,这是万万不可的,他们各有自己的优势和不足。由于基于Flowable发射的数据流,以及对数据加工处理的各操作符都添加了背压支持,附加了额外的逻辑,其运行效率要比Observable低得多。因为只有上下游运行在各自的线程中,且上游发射数据速度大于下游接收处理数据的速度时,才会产生背压问题。所以,如果能够确定上下游在同一个线程中工作,或者上下游工作在不同的线程中,而下游处理数据的速度高于上游发射数据的速度,则不会产生背压问题,就没有必要使用Flowable,以免影响性能。
通过Flowable发射处理数据流的基础代码如下:
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {//上游有一个水缸,它的的大小为128
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d("xinxi", "emit 1");
emitter.onNext(1);
Log.d("xinxi", "emit 2");
emitter.onNext(2);
Log.d("xinxi", "emit 3");
emitter.onNext(3);
Log.d("xinxi", "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.newThread()); //增加了一个参数
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d("xinxi", "onSubscribe");
s.request(2); //注意这句代码,这句代码很重要,这标志着下游可以处理多少条上游传过来的数据,如果不显示调用request则默认下游的需求量为零
}
@Override
public void onNext(Integer integer) {
Log.d("xinxi", "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w("xinxi", "onError: ", t);
}
@Override
public void onComplete() {
Log.d("xinxi", "onComplete");
}
};
upstream.observeOn(AndroidSchedulers.mainThread()).subscribe(downstream);
BackpressureStrategy背压策略
在Flowable的基础创建方法create中多了一个BackpressureStrategy类型的参数,BackpressureStrategy是个枚举,Flowable的异步缓存池不同于Observable,Observable的异步缓存池没有大小限制,可以无限制向里添加数据,直至OOM,而Flowable的异步缓存池有个固定容量,其大小为128。具体的策略说明如下:
1:ERROR
在此策略下,如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常。
2:DROP
在此策略下,如果Flowable的异步缓存池满了,会丢掉将要放入缓存池中的数据。
3:LATEST
与Drop策略一样,如果缓存池满了,会丢掉将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中。
4:BUFFER
此策略下,Flowable的异步缓存池同Observable的一样,没有固定大小,可以无限制向里添加数据,不会抛出MissingBackpressureException异常,但会导致OOM。
5:MISSING
此策略表示,通过Create方法创建的Flowable没有指定背压策略,不会对通过OnNext发射的数据做缓存或丢弃处理,需要下游通过背压操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略。
5.1:onBackpressureXXX背压操作符
Flowable除了通过create创建的时候指定背压策略,也可以在通过其它创建操作符just,fromArray等创建后通过背压操作符指定背压策略。
onBackpressureBuffer()对应BackpressureStrategy.BUFFER
onBackpressureDrop()对应BackpressureStrategy.DROP
onBackpressureLatest()对应BackpressureStrategy.LATEST
多次调用request的结果就是多次请求数量的总和,并且超出下游需求之外的数据,仍然放到了异步缓存池中。
示例代码如下
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {//FlowableEmitter中的requested()方法获取到的是异步缓存池中可放入数据的数量
e.onNext(11);
e.onNext(22);
e.onNext(33);
e.onComplete();
}
}, BackpressureStrategy.BUFFER).onBackpressureBuffer().subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription s) {
s.request(2);
s.request(1);
}
@Override
public void onNext(Object o) {
Log.d("MainActivity", "收到了" + o);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
Log.d("MainActivity", "完成了");
}
});/*输出结果为:
06-28 15:34:02.200 7292-7292/jiankang.liang.com.ceshi D/MainActivity: 收到了11
06-28 15:34:02.201 7292-7292/jiankang.liang.com.ceshi D/MainActivity: 收到了22
06-28 15:34:02.201 7292-7292/jiankang.liang.com.ceshi D/MainActivity: 收到了33
06-28 15:34:02.201 7292-7292/jiankang.liang.com.ceshi D/MainActivity: 完成了*/
参考文章如下:
Flowable背压支持——几乎可以说是对Flowable最全面而详细的讲解
———————————————————————
Single,Completable,Maybe——简化版的Observable
场景需求:在需要发射的数据并不是数据流的形式,而只是一条单一的数据,或者一条完成通知,或者一条错误通知。在这种情况下,我们再使用Observable或者Flowable就显得有点大材小用,这时就需要使用Single,Completable,Maybe。
1:Single(只发射一条单一的数据(onSuccess),或者一条异常通知(onError),不能发射完成通知,其中数据与异常通知只能发射一个)
2:Completable(只发射一条完成通知(onComplete),或者一条异常通知(onError),不能发射数据,其中完成通知与异常通知只能发射一个)
3:Maybe(可发射一条单一的数据(onSuccess),以及发射一条完成通知(onComplete),或者一条异常通知(onError),其中数据,完成通知和异常通知只能发射一个)
示例代码如下:
*************************Single***************************
Single.create(new SingleOnSubscribe<Object>() {
@Override
public void subscribe(SingleEmitter<Object> e) throws Exception {
e.onSuccess("onSuccess");
}
}).subscribe(new SingleObserver<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("MainActivity", "订阅完成");
}
@Override
public void onSuccess(Object object) {
Log.d("MainActivity", "接收成功" + object);
}
@Override
public void onError(Throwable e) {
Log.d("MainActivity", e.getMessage());
}
});/*输出结果为:
06-28 16:17:18.044 10874-10874/? D/MainActivity: 订阅完成
06-28 16:17:18.044 10874-10874/? D/MainActivity: 接收成功onSuccess*/
*************************Completable***************************
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
e.onError(new Throwable("出错了"));
}
}).subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
Log.d("MainActivity", "订阅完成");
}
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
Log.d("MainActivity", e.getMessage());
}
});/*输出结果为:
06-28 16:24:30.519 18176-18176/? D/MainActivity: 订阅完成
06-28 16:24:30.519 18176-18176/? D/MainActivity: 出错了*/
*************************Maybe***************************
Maybe.create(new MaybeOnSubscribe<Object>() {
@Override
public void subscribe(MaybeEmitter<Object> e) throws Exception {
e.onComplete();
}
}).subscribe(new MaybeObserver<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("MainActivity", "订阅完成");
}
@Override
public void onSuccess(Object value) {
Log.d("MainActivity", "value:" + value);
}
@Override
public void onError(Throwable e) {
Log.d("MainActivity", e.getMessage());
}
@Override
public void onComplete() {
Log.d("MainActivity", "完成了aaaa");
}
});/*输出结果为:
06-28 16:49:34.330 5730-5730/jiankang.liang.com.ceshi D/MainActivity: 订阅完成
06-28 16:49:34.331 5730-5730/jiankang.liang.com.ceshi D/MainActivity: 完成了aaaa*/
参考文章如下:
Rxjava2入门教程六:Single、Completable、Maybe——简化版的Observable