RxJava操作符第二波啦,上篇RxJava 操作符第一波和本篇都只是简单介绍rxjava操作符的使用,有哪里写的不对的还请告知下。
过滤操作符
debounce
含义: 距离上次发送的一段时间之内,不再发送数据
可以防止上游数据频繁发送,限流
/**
* debounce
* *: 在过了一段时间内还没有发送数据时,发送数据
* *: 可以过滤掉发送数据频率过快的数据
* 注意
* *: 中间频率过快的数据的oncomplete()方法还是会调用
* *:oncomplete()不会被过滤掉
*/
fun debounce() {
Observable.create<Int> {
subscriber ->
object : CountDownTimer(4000, 500) {
override fun onTick(millisUntilFinished: Long) {
//每0.5秒钟发送数据
subscriber.onNext(1)
}
override fun onFinish() {
}
}.start()
}
.debounce(400, TimeUnit.MILLISECONDS)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
distinct
含义:过滤重复数据(这里是所有数据的重复数据过滤)
1. 默认过滤相同的数据
2.自定义过滤条件
distinctUntilChanged
含义:和distinct类似,不同的是只和前一个数据对比是否重复
1. 默认过滤相同的数据
2.自定义过滤条件
/**
* distinct
* *: 抑制过滤,过滤重复数据,只允许还没有发送过的数据通过
* distinct()
* *: 单纯的比较数值是否相同
* distinct(Func1))
* *: 生成的key不能重复
*
*/
fun distinct() {
Observable.just("112", "113", "211", "122", "234", "321", "331", "211", "43")
// .distinct()
.distinct { t ->
//第一字母不能重复
t.toCharArray()[0].toString()
}
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
/**
* distinctUntilChange
* distinctUntilChanged()
* * 比较数值和前一个数值是否相同
* distinctUntilChanged(Func1)
* * 生成key,值比较key是否相同
*/
fun distinctUntilChange() {
Observable.just(1, 2, 1, 3, 1, 1, 3, 4, 4, 3)
// .distinctUntilChanged()
.distinctUntilChanged({
t ->
//第一字母不能重复
t.toString().toCharArray()[0].toString()
})
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
filter
含义:过滤不符合要求的数据
/**
* filter
* *: 过滤不符合要求的
* 1.返回true表示通过,可以发送消息
*/
fun filter() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
.filter {
t ->
t > 5
}
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
first
含义: 1.默认只发送第一个数据
2.自定义只发送符合要求的第一条数据
last
含义:和first相反
1.默认只发送最后一个数据
2.自定义只发送符合要求的最后一条数据
tackFirst
和first类似
tackLast
和last类似
/**
* first
* *: 只发射第一项或者满足条件的第一项数据
* first()
* *: 默认发送的是第一项
* *: 没有满足的抛出异常
* first(func1)
* *: 返回满足条件的第一项
* *: 没有满足的抛出异常
*
* firstOrDefault(defultValue)
* * :没有满足的返回默认值
* firstOrDefault(defultValue,func1)
* * :没有满足的返回默认值
*
* takeFirst(func1)
* *: 和first类似
* *: 没有满足的数值,返回空的observable
* *onnext()不执行
* *onComplete()执行
*/
fun first() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
// .first()
// .first {
// t ->
// t > 4
// }
// .firstOrDefault(-1)
// .firstOrDefault(-1, {
// t ->
// t > 4
// })
.takeFirst {
t ->
t > 4
}
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
single
含义: 在结束之前有且只发送一条数据,不然报错
/**
* single
* *: 在数据结束之前有且只能发送一条数据,否则抛出异常
* *: 与first类似
* *: 如果在源数据完成之前,不是恰巧发送一条数据,抛出异常
*
* single(func1)
* *: 发射满足条件的单个值,如果有多个满足条件,抛出异常
*
* singleOrDefault(defaultValue)
* *: 没有满足条件,返回默认值
* *: 有多个满足条件的值,抛出异常
*
* singleOrDefault(defaultValue,func1)
* *: 没有满足条件,返回默认值
* *: 有多个满足条件的值,抛出异常
*/
fun single() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
// .single()
// .single {
// t ->
// t == 8
// }
.singleOrDefault(-1)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
ignoreElements
含义: 只关注observable发送数据是否结束,不关心数据发送的执行过程
/**
* ignoreElements
* *: 终止源数据所有数据的发送,只允许onComplete()、或者onError()
*
* 使用情况:
* *: 不关心发送数据的处理,只关心数据结束的结果
*/
fun ignoreElements(){
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
.ignoreElements()
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
sample
含义: 定时采样,只发送一条数据
问题: 采样sample(observable)这个还是有问题,知道的大神,请在评论区指导下
/**
* sample
* sample(long,timeUnit)
* *: 定时间的采样,默认发送采样的最后一条数据
* throttleLast()
* *: 采样的最后一条数据
* throttleFirst()
* *: 采样的第一条数据
*/
fun sample(){
var observable=Observable.create<String> {
subscriber->
//每一秒钟发射一次
object :CountDownTimer(3000,1000){
override fun onFinish() {
subscriber.onCompleted()
}
override fun onTick(millisUntilFinished: Long) {
subscriber.onNext(""+millisUntilFinished)
}
}
}
Observable.create<Int> {
subscriber->
for (i in 1..40){
subscriber.onNext(i)
Thread.sleep(200)
}
subscriber.onCompleted()
}
// .sample(1000,TimeUnit.MILLISECONDS)
// .throttleLast(1000,TimeUnit.MILLISECONDS)
.throttleFirst(1000,TimeUnit.MILLISECONDS)
// .sample(observable)
// .sample(Observable.create(object :Observable.OnSubscribe<String>{
// override fun call(t: Subscriber<in String>?) {
// observable.subscribe({
// t?.onNext("-----")
// },{
// error->
// Log.e(RxUtil.TAG,error.toString())
// },{
// Log.e(RxUtil.TAG,"sample-complete")
// })
// }
// }))
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
skip
含义:跳过前几个数据,或者跳过前面一段时间发送的数据
/**
* skip
* skip(count)
* *: 跳跃钱几个参数,保留后面所有值
* skip(long,timeunit)
* *: 跳过前几秒数据
*/
fun skip(){
Observable.create<Int> {
subscriber->
for (i in 1..40){
subscriber.onNext(i)
Thread.sleep(200)
}
subscriber.onCompleted()
}
// .skip(5)
.skip(2,TimeUnit.SECONDS)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
结合操作符
merge
含义: 将observables转化为一个observable发送出去,发送的不按照原有数据顺序,仅仅是合并了
MergeDelayError() 可以将其他observable都执行完,最后再抛出异常
/**
* merge
* *: 合并多个observable的发射产物
* *: 无序的,发送顺序不确定
* concat
* *合并observable
* *有序的
*/
fun merge() {
var observable1 = RxUtil.getTimerObservable()
var observable2 = RxUtil.getTimerObservable()
Observable.merge(observable1, observable2)
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
zip
含义:使用一个函数组合多个observables,observable产生的数据结果发送出去,只发送一个结果
/**
* zip
* *: 将多个observables 得到最终结果,和并成一个observable发送
* *: 返回个数以最少的observable为主
*
*/
fun zip() {
var observable1 = Observable.just(1, 2, 3, 4)
var observable2 = Observable.just("5", "6", "7", "8", "9", "10")
Observable.zip(observable1, observable2, Func2<Int, String, Int> {
t1, t2 ->
t1+t2.toInt()
})
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
算术聚合操作符
concat
含义:将多个observable 连接为一个observable
startWith
含义:在observable前面插入一个observable
/**
* concat
* *: 合并多个observable
* 1.有序的,只有当第一个observable结束才开始监听第二个
*/
fun concat() {
Observable.concat(Observable.just(2), Observable.just(1))
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
count、countLong
含义:计算observable发射的消息总个数
/**
* count
* *: 将原来数据源转化为值发送一个值的observable,发送的数据为数据源发送消息的个数
* 统计observable发送了多少数据
* 1.只会发送统计的个数
*/
fun count() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
// .count()
.countLong()
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
scan
含义: observable中每一项都通过一个函数,得到的结果作为这一次的结果发送出去,并且下一次带到下一次函数中使用
/**
* scan
* *: 数据的每一项都应用一个函数,然后连续发送结果
* 1.每一项数据结果都会传递到下一项数据的函数中
*
*/
fun scan() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
.scan(object : Function2<Int, Int, Int> {
override fun invoke(p1: Int, p2: Int): Int {
return p1 + p2
}
})
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
reduce
含义: observable中每一条项目作用到某个函数中,最后发送结果数据
1. 只发送结果
/**
* reduce
* *: 和scan 不同的是,scan每次应用函数之后,都会发送数据,reduce只会发送最后的结果
* *: 按照顺序对发送的每一个数据应用,并发送最终的值
* 1.最后只返回一个最终结果的数据
*
*/
fun reduce() {
Observable.from(arrayOf(1, 2, 3, 4, 5, 6, 7, 8))
.reduce(object : Function2<Int, Int, Int> {
override fun invoke(p1: Int, p2: Int): Int {
return p1 + p2
}
})
.subscribe(RxUtil.getDefultAction1(), RxUtil.getDefultErrorAction1(), RxUtil.getDefultCompleteAction0())
}
基本的操作符已经测试使用结束啦,可能有些遗漏的暂时使用的没有那么多。下面开始正真了解RxJava的源码啦。
最后附上github项目地址