Kotlin 学习笔记(七)—— Flow 数据流学习实践指北(三)冷流转热流以及代码实例

前一节(Kotlin 学习笔记(六)—— Flow 数据流学习实践指北(二)StateFlow 与 SharedFlow)介绍完了两种热流的构造方法以及它们的特点,那有没有方法可以将冷流转化为热流呢?当然是有的。那为什么需要将冷流转化为热流呢?

假如有这么一个场景:一开始有一个冷流 coldFlow 和它对应的消费者,后来下游又有几个新来的消费者需要使用这个 coldFlow,并且还需要之前已发送过的数据。而冷流的生产者与消费者是一对一的关系,且没有 replay 缓存机制,为新的消费者再创建一个冷流开销较大,这种情况下将冷流转为热流就显得事半功倍了。

1. shareIn 操作符

Flow 中的 shareIn 操作符就可以将冷流转为热流,它的方法声明是:

// code 1
public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

首先看返回值,最终确实会转化为一个热流 SharedFlow 实例。方法参数先来看最简单的 replay 参数,就是设置回播到每个新增消费者的数据个数,默认为 0。所以默认情况下,新增的消费者只能收到从它开始收集的时间点之后,生产者发送的数据。

再来看第一个 scope 参数,用于设置一个 CoroutineScope 作用域,注意其生命周期的长度需要比任何消费者都要长,保证被转化成的热流能在所有消费者收集数据进行消费时,都能处于活跃状态。新被转化的热流其实就是一个共享数据流,可以被所有的消费者共享使用。

第二个参数 started 复杂一些,它是用于设置被转化为共享数据流的启动方式,官方提供有 3 种方式,下面一个个说:

SharingStarted.Eagerly
勤快式启动方式。不等第一个消费者出现就会立即启动,需要注意的是,这种方式只会保留启动时数据流发送的前 replay 个数据,再之前的数据会立即丢弃。即不对数据流缓存区以外的数据负责,所以 replay 缓存区大小设置很重要。

SharingStarted.Lazily
懒汉式启动方式。需要等第一个消费者出现才会启动,第一个消费者可以接收到数据流所有发送的数据;但其他后面的消费者只能接收到最近的 replay 个数据。这种方式启动的数据流会一直保持活跃状态,甚至所有的的消费者都退出观察不再接收了,数据流仍然会缓存最近的 replay 个数据。

SharingStarted.WhileSubscribed()
灵活式启动方式。默认情况下就是有消费者来它就立即启动,没消费者接收了它就立即停止。所以在第一个消费者出现数据流就启动,当最后一个消费者退出它就立即停止,但它仍会永久缓存最近的 replay 个数据。此外,这种启动方式还可以根据需求自定义设置参数:

// code 2
public fun WhileSubscribed(
    stopTimeoutMillis: Long = 0,
    replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted =
    StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)

stopTimeoutMillis:设置最后一个消费者退出后,多长时间后再关闭数据流。默认是 0,即立即关闭。
replayExpirationMillis:设置关闭流之后等待多长时间后,再重置清空缓存区 replay cache 的数据。默认是 Long.MAX_VALUE,即永远保存。

自定义 SharingStarted
其实还可以自定义启动方式,自己实现 SharingStarted 接口即可。如果看了前三种启动方式的源码,不难会发现,其实启动方式都是使用固定的几个 SharingCommand 实现的。SharingCommand 有三种:

// code 3
public enum class SharingCommand {
    /**
     * 开始启动,并开始收集上游数据流.
     * 多次发送这个命令并没有什么用(支持防抖),如果先发送 STOP 再发送 START 则是重启一个上游数据流。
     */
    START,

    /**
     * 停止数据流, 取消上游数据流的收集所在协程。
     */
    STOP,

    /**
     * 停止数据流, 取消上游数据流的收集所在协程。并且将 replayCache 缓冲区的值重置为初始状态。
     * 如果是 shareIn 操作符,则会调用 [MutableSharedFlow.resetReplayCache] 方法;
     * 如果是 stateIn 操作符,则会将缓冲数据重置为最初设置的初始值.
     */
    STOP_AND_RESET_REPLAY_CACHE
}

感兴趣的同学可以看看 SharingStarted.WhileSubscribed() 的具体实现类 StartedWhileSubscribed 里面的源码。如果需要自定义启动方式,照着葫芦画瓢即可。

既然有 shareIn,那自然就少不了 stateIn 了。

2. stateIn 操作符

方法声明:

// code 4
public fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>

首先可以看出返回值是一个热流 StateFlow 实例,那么自然而然就需要一个参数给它设置一个初始值,即第三个参数 initialValue。 前两个参数与 shareIn 一样,这里就不再赘述。

3. shareIn 与 stateIn 使用指北

3.1 SharingStarted.WhileSubscribed() 实际使用

从上面的介绍可知,这种启动方式可以在没有消费者时自动取消上游数据流,从而避免资源的浪费。但在实际使用中,建议使用 SharingStarted.WhileSubscribed(5000),即在最后一个消费者停止后再保持数据流 5 秒钟的活跃状态。避免在某些特定情况下(如配置改变——最常见就是横竖屏切换、暗夜模式切换)重启上游的数据流。

3.2 shareIn、stateIn 适用于属性声明而非方法返回值

shareInstateIn 都会创建一个新的数据流,具体说就是 shareIn 会构建一个 ReadonlySharedFlow 实例;stateIn 则会构建一个 ReadonlyStateFlow 实例。而新创建的数据流会一直保存在内存中,直到传入数据流的作用域被取消或者没有任何引用时才会被 GC 回收。

所以下面代码中,前一部分代码是禁止使用的,正确的使用应该是如后一部分的代码,即在属性中使用。

// code 5
//错误示例:每次调用方法都会构建新的数据流
fun getUser(): Flow<User> =
    userLocalDataSource.getUser()
            .shareIn(externalScope, WhileSubscribed())    

//正确示例:在属性中使用 shareIn 或 stateIn 
 val user: Flow<User> = 
     userLocalDataSource.getUser().shareIn(externalScope, WhileSubscribed())

3.3 MutableSharedFlow 的 subscriptionCount 参数

这个参数表示的是 MutableSharedFlow 中活跃的消费者数目,即订阅者的个数。可用于监听消费者的数目变更,下面就是一个例子:

// code 6
sharedFlow.subscriptionCount
    .map { count -> count > 0 } // count > 0 说明有消费者,返回 true;= 0 说明没有消费者了,返回 false
    .distinctUntilChanged() // only react to true<->false changes
    .onEach { isActive -> // configure an action
        if (isActive) { // do something } else { // do something }
    }
    .launchIn(scope) // launch it

这个例子可以在有消费者收集数据流时,做一些自己的操作;当所有消费者都停止收集时,再处理另外的一些操作,比如资源回收等。

distinctUntilChanged 操作符比较面生,它就是过滤掉前面接收到的重复值,从而使得后面只会接收到发生了变化的新值,和 StateFlow 特性一样。

onEach 操作符也比较常见,可以在流上新增一些处理操作,再发给下游。

3.4 与操作符的搭配使用

如果在实际使用中,需要得知上游数据流的一些状态,比如开始、完成等,则需要在上游数据流转为热流之前添加一些操作符起到监听的作用。

onStart 操作符监听启动,onCompletion 操作符监听完成

// code 7
private fun shareInOnStartDemo() {
    val testFlow = flow {
        println("++++emit before")
        emit(4)
        delay(1000)
        emit(5)
        delay(1000)
        emit(6)
    }.onStart {
        emit(-1)
        println("++++ onStart")
    }.onCompletion {
        emit(-100)
        println("++++ onCompletion")
    }.shareIn(
        lifecycleScope,
        SharingStarted.WhileSubscribed(),
        8
    )
    lifecycleScope.launch {
        testFlow.collect {
            println("++++ collector receive $it")
        }
    }
}
图1

从打印的 log 可以看到,确实可以监听状态。当然也可以在相同的位置添加 catch 操作符用于监听异常的发生,感兴趣的同学可以试试看。

4. StateFlow 代码实战

说了这么多 Flow 的东西,最后以一个实际的例子结束这一章节的学习笔记吧!

下面我将用一个应用实例来讲解 StateFlow 的实际应用。这个例子将会用到 debouncedistinctUnitChangedflatMapLatest 等操作符,用这些操作符去实现一个文本输入中实时查询的例子。

假设有个需求,要实现一个浏览器搜索的功能,根据用户不断输入的字符去查询相关的内容。如果不做任何处理,用户对键入的字符串做的任何修改,都会去请求一次接口,那后端服务器肯定是吃不消的;对于用户而言,在不断输入的过程中返回的结果用户并不会很关心,他只会关心最终输入完成之后请求的数据。那么,如何减少后端的接口请求次数是关键所在。

先来看看核心的代码:

// code 8   ViewModel.kt 文件
val queryStateFlow = MutableStateFlow("")

fun getQueryResult(): Flow<String> {
    return queryStateFlow
        .debounce(300L)
        .distinctUntilChanged()
        .flatMapLatest {
            if (it.isNullOrBlank()) {
                flow { emit("") }
            } else {
                dataFromNetwork(it).catch {
                    emitAll( flow { emit("") } )
                }
            }
        }
        .flowOn(Dispatchers.IO)
}

// 模拟网络请求的耗时操作
private fun dataFromNetwork(query: String): Flow<String> {
    return flow {
        delay(2000)
        emit(query) // 返回请求的结果
    }
}

首先可以直观地感受到,使用 Flow 去处理这一逻辑较为简单,代码量较少,这也是 Flow 的魅力所在。我们按顺序介绍一下所使用到的 Flow 操作符:

debounce 操作符
具体的操作符方法声明:

// code 9
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T>

用于过滤掉最新的发射值之前 timeoutMillis 时间内发射的值,返回一个过滤后的 Flow。官方栗子非常清楚:

// code 10
flow {
    emit(1)
    delay(90)
    emit(2)
    delay(90)
    emit(3)
    delay(1010)
    emit(4)
    delay(1010)
    emit(5)
}.debounce(1000)
最终会发射出下面的三个值:
3, 4, 5

发射 1 之后不到 1000ms 又发射了 2,所以 1 就会被过滤掉不会发射了,以此类推。所以最后发射的值是一定可以发射成功的。通过这个操作符,我们就可以有效减少频繁请求接口的问题,这里设置的 timeout 为 300ms,即在用户连续输入过程中每间隔 300ms 才去请求一次数据。

distinctUntilChanged 操作符
具体操作符声明为:

// code 11
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T>

用于过滤掉重复的发射值。虽然 StateFlow 本身就可过滤掉没有变化的发射值,但在这里还是需要的,因为用户可能会删除刚输入的字符,这一操作符可进一步减少不必要的接口请求。

flatMapLatest 操作符
我看的代码版本这个操作符还是实验性api,后续可能被移除。具体操作符声明为:

// code 12
@ExperimentalCoroutinesApi
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R>

这个操作符可以在原流的基础上生成一个新流,当原流依次发出 a、b 两值时,新流都会接收,但如果新流 a 值的相关操作还未结束,则会取消 a 值的相关操作,并用 b 值进行操作。简单说就是,丢弃旧值操作,换用新值操作。下面是一个例子:

// code 13
    fun flatMapLatestDemo() {
        val testFlow = flow {
            emit("a")
            delay(100)
            emit("b")
        }.flatMapLatest {
            flow {
                emit("receive $it")
                delay(200)
                emit("send $it")
            }
        }

        lifecycleScope.launch {
            testFlow.collect {
                println("----$it")
            }
        }
    }
图2

通过打印的 log 可以看出,a,b 都被 flatMapLatest 操作符接收到了,只有 b 最终通过。这是因为 a 先到达,等待了 100ms 后新的值 b 也到了,但 a 还在等待中,这时 flatMapLatest 就会取消掉 a 后续的操作。如果把 delay(200) 改成 delay(50),那最终 a,b 都能被打印出来。

所以这个操作符在 code 8 中的作用就是进一步减少接口请求的次数。当输入的新字符串到来时,就会将之前旧字符串还未结束的请求操作取消掉,用新的字符串去请求数据。

ViewModel.kt 的代码终于说完了,其他的代码就比较常规了,直接上码:

// code 14  MainActivity.kt
binding.editText.addTextChangedListener(object : TextWatcher{
    override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) { }

    override fun onTextChanged(input: CharSequence?, start: Int, before: Int, count: Int) {
        viewModel.queryStateFlow.value = input.toString()
    }

    override fun afterTextChanged(s: Editable?) { }
})

lifecycleScope.launch {
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.getQueryResult()
            .collect {
                binding.tvText.text = it
            }
    }
}

有关 Flow 的相关知识就到此结束了,来个简单总结吧~

总结

1)shareInstateIn 都可将冷流转化为热流,将数据共享给多个消费者,无需为每个消费者创建同一个数据流的新实例。两者通常用于提升性能,在没有消费者时缓存数据;
2)SharingStarted 启动方式有 EagerlyLazilyWhileSubscribed 三种,最常用的还是 WhileSubscribed,有消费者就启动,没有就停止,还能设置停止延时时长和缓存过期时长;
3)注意 shareInstateIn 都会新建一个 Flow,不要用于方法的返回值,建议赋值给属性;
4)shareInstateInonStartonCompletion 等搭配可监听转成的热流的状态;
5)distinctUntilChanged 操作符可过滤重复数据,一般用于 SharedFlow;debounce 可用于在某一时间段内防抖;flatMapLatest 操作符可以用最新值替换旧值发送给下游,旧值直接被取消作废。

更多内容,欢迎关注公众号:修之竹
或者查看 修之竹的 Android 专辑

赞人玫瑰,手留余香!欢迎点赞、转发~ 转发请注明出处~

参考文献

  1. StateFlow 和 SharedFlow 官方文档 https://developer.android.google.cn/kotlin/flow/stateflow-and-sharedflow?hl=zh-cn
  2. Flow 操作符 shareIn 和 stateIn 使用须知;Android开发者;https://mp.weixin.qq.com/s/PbqF-vzDrttYq-cSR6NDmQ
  3. Kotlin协程:冷流转换热流的使用与原理;LeeDuo;https://blog.csdn.net/LeeDuoZuiShuai/article/details/127145092
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,743评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,296评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,285评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,485评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,581评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,821评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,960评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,719评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,186评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,516评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,650评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,329评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,936评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,757评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,991评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,370评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,527评论 2 349

推荐阅读更多精彩内容