Kotlin之协程(四)——Channel

Channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信

简单构建一个生产者消费者例子

fun testChannel() = runBlocking<Unit> {
        val channel = Channel<Int>()

        val producer = GlobalScope.launch {
            var i = 0
            while (i <= 3) {
                delay(1000)
                println("send $i")
                channel.send(i++)
            }
        }
        val consumer = GlobalScope.launch {
            while (true) {
                val j = channel.receive()
                println("receiver+$j")
                if (j > 3) {
                    break
                }

            }
        }

        joinAll(producer,consumer)
    }

一、Channel的容量

Channel实际上是一个队列,队列一定存在缓冲区,那么一旦这个缓冲区满了,并且一直没有人调用receive并取走函数,send就需要挂起。故意让接收端的节奏方慢,发现send总是会挂起,直到receive之后才会继续往下执行,默认容量是0。

  //Channel的迭代
  fun testIterator() = runBlocking<Unit> {
        val channel = Channel<Int>(Channel.Factory.UNLIMITED)
        val producer = GlobalScope.launch {
            var i = 0
            while (i <= 5) {
                println("send $i")
                channel.send(i++)
            }
        }
        val consumer = GlobalScope.launch {
           val iterator =  channel.iterator()
            while (iterator.hasNext()) {
                delay(1000)
                val element = iterator.next()
                println("receiver $element")
                if (element > 5) {
                    return@launch
                }
            }
        }
        joinAll(producer,consumer)
    }

二、producer与actor,便捷的构建生产和消费

fun testProduce() = runBlocking<Unit> {
        val receiverChannel: ReceiveChannel<Int> = GlobalScope.produce<Int> {
            repeat(5) {
                send(it)
            }
        }
        val consumer = GlobalScope.launch {
            for (i in receiverChannel) {
                println("$i")
            }
        }
        consumer.join()
    }
fun testActor() = runBlocking<Unit> {
        val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> {
            while (true){
                val result=   receive()
                println(result)
            }
        }
      val producer =   GlobalScope.launch {
            for (i in 1..3) {
              sendChannel.send(i)
            }
        }
        producer.join()
    }

三、Channel的关闭

  • produce和actor返回的Channel都会随着对应的协程执行完毕而关闭,也正是这样,Channel也被称为热数据流。
  • 对于一个Channel,如果我们调用了它的close方法,它会立即停止接收新的元素,也就是说这时候它的isClosedForSend会立即返回true。而对于Channel缓冲区的存在,这时候可能还有一些数据没有处理完,因此 要等所有的元素被读取完之后isClosedForReceive才会返回false
  • Channel的生命周期最好由主导方来维护,建议由主导的一方实现关闭。
fun testChannelClose() = runBlocking<Unit> {
        val channel = Channel<Int>(3)
        val producer = GlobalScope.launch {
            List(3) {
                channel.send(it)
                println("send $it")
            }
            channel.close()
            println("send ${channel.isClosedForSend}--${channel.isClosedForReceive}")
        }
        val consumer = GlobalScope.launch {
            for (element in channel){
                println("receive $element")
                delay(100)
            }
            println("send ${channel.isClosedForSend}--${channel.isClosedForReceive}")
        }
        joinAll(producer,consumer)
    }
//print result
//send 0
//send 1
//send 2
//receive 0
//send true--false
//receive 1
//receive 2
//send true--true

四、BoradcastCahnnel

发送端和接收端存在一对多的情况,广播可以实现多个接收端且不互斥的行为。

@Test
    fun testBroadcastChannel() = runBlocking<Unit> {
        val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
//        val channel = Channel<Int>()
//        val broadcastChannel = channel.broadcast(3)
        val producer = GlobalScope.launch {
            List(3) {
                delay(100)
                broadcastChannel.send(it)
            }
            broadcastChannel.close()
        }

        List(3) { index ->
            GlobalScope.launch {
                val receiveChannel = broadcastChannel.openSubscription()
                for (i in receiveChannel) {
                    println("#$index received #$i")
                }
            }
        }.joinAll()
        
        //print result
//        #0 received #0
//        #1 received #0
//        #2 received #0
//        #0 received #1
//        #2 received #1
//        #1 received #1
//        #0 received #2
//        #1 received #2
//        #2 received #2
    }

五、channel的多路复用

多路复用select函数,返回代码块中响应最快的一个结果
//test for onWait
 fun testSelectOnWait() = runBlocking<Unit> {
        val localUser = async {
            delay(100)
            User("Bob", 25)
        }
        val remoteUser = async {
            delay(200)
            User("Jack", 28)
        }
        GlobalScope.launch {
          val responseUser =   select<Response<User>> {
                localUser.onAwait { Response<User>(it, true) }
                remoteUser.onAwait { Response<User>(it, false) }
            }
            responseUser.value?.let {
                println("response $it")
            }
        }.join()
    }
//print result
//response User(name=Bob, age=25)
复用多个Channel
//test onReceive
fun testSelectChannel() = runBlocking<Unit> {
        val channels = listOf<Channel<Int>>(Channel<Int> {}, Channel<Int> { })
        GlobalScope.launch {
            delay(50)
            channels[0].send(1)
        }
        GlobalScope.launch {
            delay(100)
            channels[1].send(2)
        }
        val result = select<Int?> {
            channels.forEach { channel ->
                channel.onReceive {
                    it

                }
            }
        }
        println(result)
        delay(1000)
    }
//print result
// 1

六、SelectClause与select

如果我想确认挂起函数是否支持select,只需要查看其是否存在对应的SelectCluseN类型可调用即可。

  • SelectClause0:对应事件没有返回值,例如join没有返回值,那么onJoin就是selelctClause0类型。使用时,onJoin的参数时一个无参函数。
 @Test
    fun testSelectClause0() = runBlocking {
        val job1 = GlobalScope.launch {
            delay(100)
            println("Job1 Done")
        }
        val job2 = GlobalScope.launch {
            delay(50)
            println("Job2 Done")
        }
        select<Unit> {
            job1.onJoin
            job2.onJoin
        }
        joinAll(job1, job2)
    }
//print result
//Job2 Done
//job2 onJoin
//Job1 Done

  • SelectClasue1:对应事件有返回值,onAwait和onReceive都是此类情况
fun testSelectClause1() = runBlocking {
        val job1 = GlobalScope.async {
            delay(100)
            "Job1 Done"
        }
        val job2 = GlobalScope.async {
            delay(50)
            "Job2 Done"
        }
        val result = select<String> {
            job1.onAwait {
                "onWait $it"
            }
            job2.onAwait { "onWait $it"
            }
        }
        println(result)
        delay(1000)
    }
//print result
//onWait Job2 Done
  • SelectClause2:对n应事件有返回值,此外还要一个额外的参数,例如,Channel.onSend有两个参数,第一个是Channel数据类型的值,表示即将发送的值,第二个是发送成功时的回调参数。
fun testSelectClause2() = runBlocking {
        val channels = listOf<Channel<Int>>(Channel<Int>(), Channel<Int>())
        println(channels)
        launch(Dispatchers.IO) {
            select<Unit?> {
                launch {
                    delay(100)
                    channels[1].onSend(10) { sendChannel ->
                        println("sent on $sendChannel")
                    }
                }

                launch {
                    delay(200)
                    channels[0].onSend(20) { sendChannel ->
                        println("sent on $sendChannel")
                    }
                }
            }
        }
        GlobalScope.launch {
            println("${channels[0].receive()}")
        }
        GlobalScope.launch {
            println("${channels[1].receive()}")
        }
        delay(100)
    }

七、Flow的多路复用

八、协程的并发安全

fun testUnSafe() = runBlocking<Unit> {
        var count = 0
        List(1000) {
            GlobalScope.launch {
                count++
            }
        }.joinAll()
        println(count)
    }
//print result
//963
//正常结果应该是1000,但是因为没有实现并发安全,造成了结果的错误

协程框架提供了一些并发安全的工具:

  • Channel : 并发安全的消息通道。
  • Mutex: 轻量级锁,它的lock和unlock从语义上与线程锁类似,之所以轻量是它获取不到锁时不会阻塞线程,而是挂起等待锁的释放。
  • Semaphore:轻量级信号量,信号量可以有多个,协程在获取到信号量后即可执行并发操作。当Semaphore的参数为1时等同于Mutex。
fun testSafeMutex() = runBlocking<Unit> {
        var count = 0
        val mutex = Mutex()
        List(1000) {
            GlobalScope.launch {
               mutex.withLock {
                   count++
               }
            }
        }.joinAll()
        println(count)
    }


 //semaphore
fun testSafeSeMaphore() = runBlocking<Unit> {
        var count = 0
        val semaphore = Semaphore(1)
        List(1000) {
            GlobalScope.launch {
                semaphore.withPermit {
                   count++
               }
            }
        }.joinAll()
        println(count)
    }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 207,113评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,644评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,340评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,449评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,445评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,166评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,442评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,105评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,601评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,066评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,161评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,792评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,351评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,352评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,584评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,618评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,916评论 2 344

推荐阅读更多精彩内容