kotlin<第十篇>:Flow-异步流

Flow: 是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。
流的连续性:流的每次单独收集都是按顺序执行的,除非使用特殊操作符。
从上游到下游每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符。

flow构建器创建一个函数
返回多个值,而且是异步的,不是一次性返回

(1)构建流的三种方式

// flow构建器创建一个函数
// 返回多个值,而且是异步的,不是一次性返回
suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        delay(1000)
        emit(i) // 发射,产生一个元素
    }
}
runBlocking {
    // Flow构建方式1
    simpleFlow().collect { value -> println(value) } // 收集元素

    // Flow构建方式2
    (1..5).asFlow().filter {
        it % 2 == 0
    }.map {
        println("Map $it")
    }.onEach {
        delay(1000)
    }.collect {
        println("Collect $it")
    }

    // Flow构建方式3
    flowOf("one", "two", "three").onEach { delay(1000) }.collect { values ->
        println(values)
    }
}

(2)流的上下文

    // Flow上下文验证
    (1..5).asFlow().filter {
        println("当前线程-filter:" + Thread.currentThread().name)
        it % 2 == 0
    }.map {
        println("当前线程-map:" + Thread.currentThread().name)
    }.onEach {
        delay(1000)
    }.collect {
        println("当前线程-collect:" + Thread.currentThread().name)
        println("Collect $it")
    }

从打印结果上看,上游和下游都是在主线程。
但是,一般情况下,Flow构建之后的代码块中是耗时操作,所以不能放在主线程,解决方案是:在Flow构建器后面添加 flowOn(Dispatchers.Default),改造后的代码如下:

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        delay(1000)
        emit(i) // 发射,产生一个元素
    }
}.flowOn(Dispatchers.Default)

fun main() {
    runBlocking {
        // Flow构建方式1
        simpleFlow().collect { value -> println(value) } // 收集元素

        // Flow构建方式2
        (1..5).asFlow().filter {
            println("当前线程-filter:" + Thread.currentThread().name)
            it % 2 == 0
        }.map {
            println("当前线程-map:" + Thread.currentThread().name)
        }.onEach {
            delay(1000)
        }.flowOn(Dispatchers.Default).collect {
            println("当前线程-collect:" + Thread.currentThread().name)
            println("Collect $it")
        }

        // Flow构建方式3
        flowOf("one", "two", "three").flowOn(Dispatchers.Default).onEach { delay(1000) }.collect { values ->
            println(values)
        }

    }
}

(3)启动流

启动流:launchIn传入协程作用域形参,使用launchIn替换collect我们可以在指定协程中启动流的收集

    (1..5).asFlow().onEach {
        delay(1000)
    }.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO)).join()


    (1..5).asFlow().onEach {
        delay(1000)
    }.flowOn(Dispatchers.Default).launchIn(this).join()

(4)流的取消

使用 withTimeoutOrNull 方式取消:

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        delay(1000)
        emit(i) // 发射,产生一个元素
    }
}.flowOn(Dispatchers.Default)

fun main() {
    runBlocking {

        withTimeoutOrNull(2000) {
            // Flow构建方式1
            simpleFlow().collect { value -> println(value) } // 收集元素
        }

        withTimeoutOrNull(2000) {
            (1..5).asFlow().onEach {
                delay(1000)
            }.flowOn(Dispatchers.Default).collect {
                println("Collect $it")
            }
        }

        withTimeoutOrNull(2000) {
            flowOf("one", "two", "three").flowOn(Dispatchers.Default).onEach { delay(1000) }.collect { values ->
                println(values)
            }
        }

        withTimeoutOrNull(2000) {
            (1..5).asFlow().onEach {
                delay(1000)
            }.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO)).join()
        }

        println("Done...")

    }
}

另外,启动流还可以调用 cancelAndJoin 取消。

    val job = (1..5).asFlow().onEach {
        delay(1000)
    }.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO))
    delay(1000)
    job.cancelAndJoin()

(5)流的取消检测

为方便起见,流构建器对每个发射值执行附加的ensureActive 检测以进行取消,这意味着从 flow{...} 发出的繁忙循环是可以取消的。
出于性能原因,大多数其他流操作不会自行执行其他取消检测,在协程处于繁忙循环的情况下,必须明确检测是否取消。
通过cancellable操作符来执行此操作。

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..5) {
        delay(1000)
        emit(i) // emit自带检测是否取消的能力
    }
}.flowOn(Dispatchers.Default)

fun main() {
    runBlocking {

        // emit 自带检测是否取消的能力
        simpleFlow().collect { value ->
            if (value == 3) cancel()
        }

        // 如果没有emit,需要使用 cancellable
        (1..5).asFlow().cancellable().onEach {
            delay(1000)
        }.flowOn(Dispatchers.Default).collect { value ->
            if (value == 3) cancel()
        }

    }
}

(6)背压

背压:水流受到与流动方向一致的压力。
生产者、消费者模式,只要生产效率 > 消费效率,那么就会产生背压。

处理背压的方式有:

  • buffer(),并发运行流中发射元素的代码
  • conflate(),合并发射项,不对每个值进行处理
  • collectLatest(),取消并重新发射最后一个值
  • 当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓冲机制,但是buffer函数显示地请求缓冲而不改变执行上下文
suspend fun simpleFlow() = flow<Int> {
    for (i in 1..50) {
        println("发送数据:$i")
        delay(100)
        emit(i)
    }
}

fun main() {
    runBlocking {
        val time = measureTimeMillis {
            simpleFlow()
                .collect { value ->
                delay(300)
                println("接收数据:$value")
            }
        }
        println("耗时:$time")
    }
}

以上代码,发送数据和接收数据都是在同一个线程中并行执行,如果存在耗时程序,将特别影响效率。

为了增加执行效率,可以使用 buffer 设置缓存大小,从而起到加快执行速率的效果。

    val time = measureTimeMillis {
        // 背压
        simpleFlow()
            .buffer(10)
            .collect { value ->
            delay(300)
            println("接收数据:$value")
        }
    }

但是,从生产者/消费者的设计思想的角度上考虑,发送数据最好放在子线程。

    val time = measureTimeMillis {
        // 背压
        simpleFlow()
            .flowOn(Dispatchers.Default)
            .collect { value ->
            delay(300)
            println("接收数据:$value")
        }
    }

使用 flowOn 可以指定 Flow 的协程作用域,这样可以将 并行 转成 并发,从而加快执行效率。

runBlocking {
    val time = measureTimeMillis {
        // 背压
        simpleFlow()
            .conflate()
            .collect { value ->
            delay(300)
            println("接收数据==:$value")
        }
    }
    println("耗时:$time")
}

以上代码使用 conflate,中间一些元素不会处理,从而加快执行效率。

    val time = measureTimeMillis {
        // 背压
        simpleFlow()
            .collectLatest { value ->
            delay(300)
            println("接收数据==:$value")
        }

以上代码将 collect 改成 collectLatest 之后,只会处理最后一个值,从而加速执行速度。

(7)转换操作符

使用map转换:

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        println(i)
        emit(i)
    }
}

fun main() {
    runBlocking {
        simpleFlow()
            .map { value ->
                "response $value"
            }
            .collect { value ->
                println(value)
            }
    }
}

使用transform转换,可以转换成任意次、任意值的Flow:

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        println(i)
        emit(i)
    }
}

fun main() {
    runBlocking {
        simpleFlow()
            .transform { request ->
                emit("request $request")
                emit("request $request")
            }
            .collect { value ->
                println(value)
            }
    }
}

(8)限长操作符

take 是限长操作符,可以限制处理的数量:

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        println(i)
        emit(i)
    }
}

fun main() {
    runBlocking {
        simpleFlow()
            .take(2)
            .collect { value ->
                println(value)
            }
    }
}

(9)末端操作符

末端操作符是在流上用于 启动流收集的挂起函数。collect是最基础的末端操作符,但是还有另外一些更加方便使用的末端操作符:

  • 转化为各种集合,例如:toList与toSet。
  • 获取第一个(first)值与确保流发射单个(single)值的操作符。
  • 使用reduce与fold将流规约到单个值。
fun main() {
    runBlocking {
        val sum = simpleFlow()
            .reduce { a, b ->
                a + b
            }
        println(sum)
    }
}

reduce 操作符可以将元素累加。
reduce的返回值类型必须和集合的元素类型相符。

suspend fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
        emit(i)
    }
}

fun main() {
    runBlocking {
        val newStr = simpleFlow()
            .fold(StringBuilder()) { str: StringBuilder, a: Int ->
                str.append(a).append(" ")
            }
        println(newStr)
    }
}

而fold的返回值类型则不受约束。

(10)组合操作符

zip 操作符将两个流合并。

runBlocking {
    val nums1 = (1..3).asFlow()
    val nums2 = flowOf("one", "two", "three")
    nums1.zip(nums2) {a, b ->
        "$a $b"
    }.collect {value->
        println(value)
    }
}

(11)展平操作符

流表示异步接收的值序列,所以很容易遇到这种情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不同的展平模式,为此,存在一系列的流展平操作符:

  • flatMapConcat:连接模式
  • flatMapMerge:合并模式
  • flatMapLatest: 最新展平模式
suspend fun requestFlow(i: Int) = flow<String> {
    emit("request $i first")
    delay(500)
    emit("request $i second")
}

fun main() {
    runBlocking {
        val startTime = System.currentTimeMillis()
        (1..3).asFlow()
            .onEach { delay(100) }
            .flatMapConcat {
                requestFlow(it) // Flow的元素是Flow
            }
            .collect { value->
            println("$value -- ${System.currentTimeMillis() - startTime}")
        }
    }
}

代码中 flatMapConcat 可以换成 flatMapMerge 或者 flatMapLatest

三者的执行结果是:

flatMapConcat :(requestFlow全部执行完)

request 1 first -- 198
request 1 second -- 701
request 2 first -- 815
request 2 second -- 1319
request 3 first -- 1428
request 3 second -- 1932

flatMapMerge:(不需要等待requestFlow全部执行完)

request 1 first -- 281
request 2 first -- 361
request 3 first -- 470
request 1 second -- 798
request 2 second -- 876
request 3 second -- 985

flatMapLatest:

request 1 first -- 250
request 2 first -- 376
request 3 first -- 485
request 3 second -- 1001

(12)流的异常处理

suspend fun requestFlow() = flow<Int> {
    for (i in 1..3) {
        emit(i)
        throw RuntimeException("exception")
    }
}.catch {e: Throwable ->
    println("上游异常捕获:" + e.message)
}

fun main() {
    runBlocking {
        try {
            requestFlow()
                .collect { value->
                    check(value < 2) // 检查异常
                    println(value)
                }
        } catch (e: Throwable) {
            println("下游异常捕获:" + e.message)
        }
    }
}

check:检查异常,一旦检查到异常,程序crash。
下游通过 try...catch 捕获异常,上游Flow自带 catch 函数。

(13)流的完成

收集完成时,使用 finally,表示收集完成。

suspend fun requestFlow() = flow<Int> {
    for (i in 1..3) {
        emit(i)
    }
}

fun main() {
    runBlocking {
        try {
            requestFlow().collect { value-> println(value) }
        } finally {
            println("...完成...")
        }
    }
}

使用 onCompletion 也可以表示完成:

suspend fun requestFlow() = flow<Int> {
    for (i in 1..3) {
        emit(i)
        throw RuntimeException("exception")
    }
}.catch {exception->
    println("catch -> exception:" + exception.message)
}

fun main() {
    runBlocking {

        requestFlow()
            .onCompletion {exception ->
                if (exception != null) { // 异常导致完成
                    println("finish -> exception:" + exception.message)
                } else { // 正常结束
                    println("正常结束")
                }
            }
            .collect { value-> println(value) }

    }
}

onCompletion 可以拿到异常信息,但是不能捕获异常。

(13)Flow实现多路复用

多数情况下,我们可以通过构造合适的Flow来实现多路复用的效果。

data class User(val name: String)
data class Response<T>(val value: T, val isLocal: Boolean)
suspend fun CoroutineScope.getUserForLocal(name: String) = async {
    delay(1000)
    User(name)
}

suspend fun CoroutineScope.getUserFromRemote(name: String) = async {
    delay(100)
    User(name)
}

fun main() {
    runBlocking {
        val name = "guest"
        // 两个函数
        listOf(::getUserForLocal, ::getUserFromRemote)
            .map { function->
                function.call(name)
            }
            .map { deferred ->
                flow { emit(deferred.await()) }
            }.merge().collect { user -> println(user) }

    }
}

以上代码用到了反射,需要引入依赖:

implementation 'org.jetbrains.kotlin:kotlin-reflect:1.0.6'

[完...]

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容