目录
【Koltin Flow(一)】五种创建flow的方式
【Koltin Flow(二)】Flow操作符之末端操作符
【Koltin Flow(三)】Flow操作符之中间操作符(一)
【Koltin Flow(三)】Flow操作符之中间操作符(二)
【Koltin Flow(三)】Flow操作符之中间操作符(三)
【Koltin Flow(四)】Flow背压
【Koltin Flow(五)】SharedFlow及StateFlow
SharedFlow
简介
相对于Flow而言,SharedFlow为热流,也就是说不管有无接收者,都会发送值。
一、基本使用
代码如下:
val sharedFlow = MutableSharedFlow<Int>()
launch {
sharedFlow.collect {
Log.d(TAG.TAG,"SharedFlow $it")
}
}
delay(10)
sharedFlow.emit(1)
sharedFlow.emit(100)
sharedFlow.emit(100)
日志如下:
2022-08-03 10:16:53.366 4955-4981/edu.test.demo D/Test-TAG: SharedFlow 1
2022-08-03 10:16:53.366 4955-4981/edu.test.demo D/Test-TAG: SharedFlow 100
2022-08-03 10:16:53.367 4955-4981/edu.test.demo D/Test-TAG: SharedFlow 100
分析
- 日志可以看出 基本使用的时候和flow本身没多大区别,发送-接收。
- 可以看出上面有个delay(10),如果没有则可能会接收不到值,因为SharedFlow为热流,不管有无接收者,emit都会直接发送值。
二、设置订阅重发
我们看SharedFlow构造的第一个参数replay,此参数用来设置订阅重发的个数。
代码如下:
val sharedFlow = MutableSharedFlow<Int>(
replay = 2
)
sharedFlow.emit(1)
sharedFlow.emit(100)
sharedFlow.emit(100)
//注释1 在此处加上重发缓存打印
//Log.d(TAG.TAG,"SharedFlow ${sharedFlow.replayCache}")
delay(100)
launch {
sharedFlow.collect {
Log.d(TAG.TAG,"SharedFlow $it")
}
}
//注释2 重置缓存
//delay(100)
//sharedFlow.resetReplayCache()
//Log.d(TAG.TAG,"SharedFlow ${sharedFlow.replayCache}")
//launch {
// sharedFlow.collect {
// Log.d(TAG.TAG,"SharedFlow 2 $it")
//}
//}
日志如下:
//注释1 处的打印
//2022-08-03 10:23:13.621 5106-5131/edu.test.demo D/Test-TAG: SharedFlow [100, 100]
2022-08-03 10:21:14.906 5043-5070/edu.test.demo D/Test-TAG: SharedFlow 100
2022-08-03 10:21:14.906 5043-5070/edu.test.demo D/Test-TAG: SharedFlow 100
//注释2下的打印
//2022-08-03 10:26:54.917 5233-5260/edu.test.demo D/Test-TAG: SharedFlow []
分析:
- 可以看到,虽然是发送在前,接收在后,但还是收到了两个值。因为replay重发的值设置为2,可以这样看起来不直观,这样,我们修改下代码,放开注释1出的打印,则日志会多出来【注释1 处的打印】部分的内容,可以很清晰的看出缓存了后两个值。
- 当然此缓存可以重置,也就是清空之前的缓存,放开注释2处的代码,则可以看出resetReplayCache,之后则清空了缓存,后面的collect接收不到缓存的值。
三、其他两个参数,可参考背压部分的内容
四、shareIn操作符
shareIn操作符是将冷流flow转换为热流SharedFlow,主要参数有三个
1、第一个为作用域。
2、策略,分为三种Eagerly(立即发送)、Lazily(有第一个订阅者之后发送)、
WhileSubscribed()(在第一个订阅者出现之后开始、在最后一个订阅者、消失后结束),可配置二外的参数:
stopTimeoutMillis 为最后一个订阅者小时候保留的时长,单位ms,默认为0。
replayExpirationMillis 为最后一个订阅者消失后,缓存保留的时长,单位ms,默认为Long.MAX_VALUE。
3、缓存重发的个数。
第一种策略
代码如下:
val sharedFlow = (1..5).asFlow().shareIn(this, SharingStarted.Eagerly,0)
delay(100)
sharedFlow.collect {
Log.d(TAG.TAG,"shareIn $it")
}
日志没有
分析:
- 我们发现没有日志,原因在哪里呢,因为转换成热流之后策略为Eagerly,立即开始发送,但是100ms之后才有collect,同时replay的个数为0,所以接收不到值,如果我们将replay个数改为2,则可以接收到4和5,和上面的订阅重发是一样的。
第二种策略
代码如下:
val sharedFlow = (1..5).asFlow().shareIn(this, SharingStarted.Lazily, 2)
delay(100)
launch {
sharedFlow.collect {
Log.d(TAG.TAG, "shareIn $it")
}
}
delay(100)
launch {
sharedFlow.collect {
Log.d(TAG.TAG, "shareIn 2 $it")
}
}
日志如下:
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 1
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 2
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 3
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 4
2022-08-03 11:09:10.104 6691-6717/edu.test.demo D/Test-TAG: shareIn 5
2022-08-03 11:09:10.205 6691-6720/edu.test.demo D/Test-TAG: shareIn 2 4
2022-08-03 11:09:10.205 6691-6720/edu.test.demo D/Test-TAG: shareIn 2 5
分析:
- 可以看出,第一个collect能接收到全部值,那是因为Lazily是在第一个订阅者出现后才发送值的,但是第二个collect却只接收到了缓存的两个值,那是因为Lazily只管第一个collect,不管后续的collect。
第三种策略
1. 采用默认参数
代码如下:
var time = 0L
time = System.currentTimeMillis()
val sharedFlow = (1..100).asFlow().onStart {
Log.d(TAG.TAG,"onStart ${System.currentTimeMillis() - time}")
}.onCompletion {
Log.d(TAG.TAG,"onCompletion ${System.currentTimeMillis() - time}")
}.onEach {
delay(1000)
}.shareIn(this, SharingStarted.WhileSubscribed(), 0)
delay(1000)
launch {
Log.d(TAG.TAG, "1 shareIn ${sharedFlow.first()}")
Log.d(TAG.TAG,"1 接收到第一个值 ${System.currentTimeMillis() - time}")
}
delay(3000)
launch {
Log.d(TAG.TAG, "2 shareIn ${sharedFlow.first()}")
Log.d(TAG.TAG,"2 接收到第一个值 ${System.currentTimeMillis() - time}")
}
日志如下:
2022-08-03 14:00:32.842 8905-8930/edu.test.demo D/Test-TAG: onStart 1029
2022-08-03 14:00:33.844 8905-8930/edu.test.demo D/Test-TAG: 1 shareIn 1
2022-08-03 14:00:33.844 8905-8930/edu.test.demo D/Test-TAG: 1 接收到第一个值 2031
2022-08-03 14:00:33.845 8905-8931/edu.test.demo D/Test-TAG: onCompletion 2032
2022-08-03 14:00:35.839 8905-8931/edu.test.demo D/Test-TAG: onStart 4026
2022-08-03 14:00:36.841 8905-8931/edu.test.demo D/Test-TAG: 2 shareIn 1
2022-08-03 14:00:36.841 8905-8931/edu.test.demo D/Test-TAG: 2 接收到第一个值 5028
2022-08-03 14:00:36.841 8905-8930/edu.test.demo D/Test-TAG: onCompletion 5028
分析:
- 可以看出 在first之后第一个接收者消失,所以执行了onCompletion,也就是flow结束了,而且接收到第一个值和onCompletion的时间基本是一致的。
- 在4000ms之后第二个接收者出现,重新执行了onStart ,并且在first之后也执行了onCompletion结束了。
2.进行相关的参数配置
代码如下:
var time = 0L
time = System.currentTimeMillis()
val sharedFlow = (1..100).asFlow().onStart {
Log.d(TAG.TAG,"onStart ${System.currentTimeMillis() - time}")
}.onCompletion {
Log.d(TAG.TAG,"onCompletion ${System.currentTimeMillis() - time}")
}.onEach {
delay(1000)
}.shareIn(this, SharingStarted.WhileSubscribed(
stopTimeoutMillis = 500,
replayExpirationMillis = 2000
), replay = 5)
delay(1*1000)
launch {
Log.d(TAG.TAG, "1 shareIn ${sharedFlow.take(5).toList()}")
Log.d(TAG.TAG,"1 接收到值 ${System.currentTimeMillis() - time}")
}
delay(10*1000)
launch {
Log.d(TAG.TAG, "2 shareIn ${sharedFlow.take(10).toList()}")
Log.d(TAG.TAG,"2 接收到值 ${System.currentTimeMillis() - time}")
}
日志如下( stopTimeoutMillis = 500,replayExpirationMillis = 2000,replay = 5):
2022-08-03 14:21:15.245 9591-9616/edu.test.demo D/Test-TAG: onStart 1030
2022-08-03 14:21:20.252 9591-9616/edu.test.demo D/Test-TAG: 1 shareIn [1, 2, 3, 4, 5]
2022-08-03 14:21:20.252 9591-9616/edu.test.demo D/Test-TAG: 1 接收到值 6037
2022-08-03 14:21:20.753 9591-9616/edu.test.demo D/Test-TAG: onCompletion 6538
2022-08-03 14:21:25.243 9591-9622/edu.test.demo D/Test-TAG: onStart 11028
2022-08-03 14:21:35.253 9591-9622/edu.test.demo D/Test-TAG: 2 shareIn [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
2022-08-03 14:21:35.253 9591-9622/edu.test.demo D/Test-TAG: 2 接收到值 21038
2022-08-03 14:21:35.755 9591-9617/edu.test.demo D/Test-TAG: onCompletion 21540
分析:
- 可以看出,在设置了 stopTimeoutMillis = 500之后,接收到值得时间和onCompletion的时间基本差了500ms,其实就是就是延时结束。
- 在设置了replayExpirationMillis = 2000之后,第二次开始和接收到值的时间基本就是发送10个值得时间,具体值为1-10,那是因为缓存的值已经失效了,此时这个replay = 5没有多大意义,因为到下次的时候值已经失效了。
日志如下( stopTimeoutMillis = 500,replayExpirationMillis = Long.MAX_VALUE,replay = 5):
2022-08-03 14:47:30.144 9697-9724/edu.test.demo D/Test-TAG: onStart 1030
2022-08-03 14:47:35.154 9697-9723/edu.test.demo D/Test-TAG: 1 shareIn [1, 2, 3, 4, 5]
2022-08-03 14:47:35.154 9697-9723/edu.test.demo D/Test-TAG: 1 接收到值 6040
2022-08-03 14:47:35.655 9697-9723/edu.test.demo D/Test-TAG: onCompletion 6541
2022-08-03 14:47:40.141 9697-9726/edu.test.demo D/Test-TAG: onStart 11027
2022-08-03 14:47:45.149 9697-9727/edu.test.demo D/Test-TAG: 2 shareIn [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
2022-08-03 14:47:45.149 9697-9727/edu.test.demo D/Test-TAG: 2 接收到值 16035
2022-08-03 14:47:45.652 9697-9726/edu.test.demo D/Test-TAG: onCompletion 16538
分析:
- 首先看区别,第一个接收者基本没变化。
- 第二个接收者有两点变化,接收到的值变了,不再是1-10,而是两个1-5,而且第二个onStart和接收到值的时间也变了,不再是10个值的时间,而是五个值的时间,原因主要如下:第一replayExpirationMillis值设置为 Long.MAX_VALUE(这也是其默认值)之后,第二个接收者出现时,缓存并未失效,所以出现了前面的12345(因为replay=5,缓存前五个),但是take(10)不够了,所以又出现了onStart,再接收五个,也就是后面的12345,时间也就是接收五个的时间。
- 如果将第二个接收者的take(10)变成take(5),缓存就直接够了,就不会出现第二个onStart。
- 当然这也是因为前面是take(5),如果改成take(3),后面也就会出现onStart,数据变成12312,那是因为虽然缓存池为5,但是只缓存了三个值,还需要两个。
StateFlow
简介
和SharedFlow一样,StateFlow也是热流,但是区别在于状态的保存,保存了最新的值,也就是新的接收者会收到最新的值,
和设置了replay = 1的SharedFlow比较类似。
简单使用
代码如下:
val stateFlow = MutableStateFlow(0)
launch {
stateFlow.collect{
Log.d(TAG.TAG,"stateFlow 1 collect $it")
}
}
delay(1000)
stateFlow.value = 10
delay(1000)
stateFlow.value = 10
delay(1000)
stateFlow.value = 11
launch {
stateFlow.collect{
Log.d(TAG.TAG,"stateFlow 2 collect $it")
}
}
日志如下:
2022-08-03 15:43:14.618 11669-11705/edu.test.demo D/Test-TAG: stateFlow 1 collect 0
2022-08-03 15:43:15.623 11669-11704/edu.test.demo D/Test-TAG: stateFlow 1 collect 10
2022-08-03 15:43:17.626 11669-11705/edu.test.demo D/Test-TAG: stateFlow 1 collect 11
2022-08-03 15:43:17.626 11669-11705/edu.test.demo D/Test-TAG: stateFlow 2 collect 11
分析:
- 可以看到接收者1刚开始收到了初始值1,那是因为StateFlow在每个接收者出现时都会接收到最新的值。
- 接收者1后面又接收到了10,那是动态更新StateFlow的value值触发的,但是注意10发送了两次,但是只接收到了一次,说明StateFlow是天然防抖的,连续发送两次同样的值,只会接收一次,后面又接收到了11和第一个10效果一致,是value值触发的。
- 接收者2接收到11的道理和接收者1接收到1的道理是一样的。
stateIn操作符
stateIn操作符是将冷流flow转换为热流StateFlow,主要参数有三个
1、第一个为作用域。
2、策略,分为三种Eagerly(立即发送)、Lazily(有第一个订阅者之后发送)、
WhileSubscribed()(在第一个订阅者出现之后开始、在最后一个订阅者、消失后结束),可配置二外的参数:
stopTimeoutMillis 为最后一个订阅者小时候保留的时长,单位ms,默认为0。
replayExpirationMillis 为最后一个订阅者消失后,缓存保留的时长,单位ms,默认为Long.MAX_VALUE。
3、StateFlow的初始值。
策略基本和shareIn是一致的,只是replay固定为1,另外会有一个初始值。
分析一种,其他的和shareIn类比即可,策略为WhileSubscribed()。
代码如下:
var time = 0L
time = System.currentTimeMillis()
val stateFlow = (1..10).asFlow().onStart {
Log.d(TAG.TAG,"onStart ${System.currentTimeMillis() - time}")
}.onCompletion {
Log.d(TAG.TAG,"onCompletion ${System.currentTimeMillis() - time}")
}.onEach {
delay(1000)
}.stateIn(this, SharingStarted.WhileSubscribed(),0)
launch {
Log.d(TAG.TAG, "stateFlow 1 collect ${stateFlow.take(5).toList()}")
Log.d(TAG.TAG,"接收到值 ${System.currentTimeMillis() - time}")
}
delay(10*1000)
launch {
Log.d(TAG.TAG, "stateFlow 2 collect ${stateFlow.take(5).toList()}")
Log.d(TAG.TAG,"接收到值 ${System.currentTimeMillis() - time}")
}
日志如下:
2022-08-03 15:33:35.204 11353-11379/edu.test.demo D/Test-TAG: onStart 70
2022-08-03 15:33:39.219 11353-11380/edu.test.demo D/Test-TAG: stateFlow 1 collect [0, 1, 2, 3, 4]
2022-08-03 15:33:39.219 11353-11380/edu.test.demo D/Test-TAG: 接收到值 4101
2022-08-03 15:33:39.221 11353-11378/edu.test.demo D/Test-TAG: onCompletion 4102
2022-08-03 15:33:45.145 11353-11378/edu.test.demo D/Test-TAG: onStart 10027
2022-08-03 15:33:49.151 11353-11378/edu.test.demo D/Test-TAG: stateFlow 2 collect [4, 1, 2, 3, 4]
2022-08-03 15:33:49.151 11353-11378/edu.test.demo D/Test-TAG: 接收到值 14033
2022-08-03 15:33:49.151 11353-11380/edu.test.demo D/Test-TAG: onCompletion 14033
分析:
- 可以看到接收者1接收到的值为01234,因为有一个初始值为0,占了一个位置,开始到接收到值的时间也基本是个值的时间,接收到之后也打印了onCompletion,和接收到值的时间基本一致。
- 第二个接收者,接收到的值为41234,因为StateFlow缓存了一个最新值4,再接收四个新值,时间和接收者1类似。
总结
- 本篇主要介绍了SharedFlow和StateFlow的基本使用、以及参数设置相关内容。
- 本篇也终点介绍了shareIn操作符的使用,以及各种策略参数的设置,stateIn类比shareIn理解。