Channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信
简单构建一个生产者消费者例子
fun testChannel() = runBlocking<Unit> {
val channel = Channel<Int>()
val producer = GlobalScope.launch {
var i = 0
while (i <= 3) {
delay(1000)
println("send $i")
channel.send(i++)
}
}
val consumer = GlobalScope.launch {
while (true) {
val j = channel.receive()
println("receiver+$j")
if (j > 3) {
break
}
}
}
joinAll(producer,consumer)
}
一、Channel的容量
Channel实际上是一个队列,队列一定存在缓冲区,那么一旦这个缓冲区满了,并且一直没有人调用receive并取走函数,send就需要挂起。故意让接收端的节奏方慢,发现send总是会挂起,直到receive之后才会继续往下执行,默认容量是0。
//Channel的迭代
fun testIterator() = runBlocking<Unit> {
val channel = Channel<Int>(Channel.Factory.UNLIMITED)
val producer = GlobalScope.launch {
var i = 0
while (i <= 5) {
println("send $i")
channel.send(i++)
}
}
val consumer = GlobalScope.launch {
val iterator = channel.iterator()
while (iterator.hasNext()) {
delay(1000)
val element = iterator.next()
println("receiver $element")
if (element > 5) {
return@launch
}
}
}
joinAll(producer,consumer)
}
二、producer与actor,便捷的构建生产和消费
fun testProduce() = runBlocking<Unit> {
val receiverChannel: ReceiveChannel<Int> = GlobalScope.produce<Int> {
repeat(5) {
send(it)
}
}
val consumer = GlobalScope.launch {
for (i in receiverChannel) {
println("$i")
}
}
consumer.join()
}
fun testActor() = runBlocking<Unit> {
val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> {
while (true){
val result= receive()
println(result)
}
}
val producer = GlobalScope.launch {
for (i in 1..3) {
sendChannel.send(i)
}
}
producer.join()
}
三、Channel的关闭
- produce和actor返回的Channel都会随着对应的协程执行完毕而关闭,也正是这样,Channel也被称为热数据流。
- 对于一个Channel,如果我们调用了它的close方法,它会立即停止接收新的元素,也就是说这时候它的isClosedForSend会立即返回true。而对于Channel缓冲区的存在,这时候可能还有一些数据没有处理完,因此 要等所有的元素被读取完之后isClosedForReceive才会返回false
- Channel的生命周期最好由主导方来维护,建议由主导的一方实现关闭。
fun testChannelClose() = runBlocking<Unit> {
val channel = Channel<Int>(3)
val producer = GlobalScope.launch {
List(3) {
channel.send(it)
println("send $it")
}
channel.close()
println("send ${channel.isClosedForSend}--${channel.isClosedForReceive}")
}
val consumer = GlobalScope.launch {
for (element in channel){
println("receive $element")
delay(100)
}
println("send ${channel.isClosedForSend}--${channel.isClosedForReceive}")
}
joinAll(producer,consumer)
}
//print result
//send 0
//send 1
//send 2
//receive 0
//send true--false
//receive 1
//receive 2
//send true--true
四、BoradcastCahnnel
发送端和接收端存在一对多的情况,广播可以实现多个接收端且不互斥的行为。
@Test
fun testBroadcastChannel() = runBlocking<Unit> {
val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
// val channel = Channel<Int>()
// val broadcastChannel = channel.broadcast(3)
val producer = GlobalScope.launch {
List(3) {
delay(100)
broadcastChannel.send(it)
}
broadcastChannel.close()
}
List(3) { index ->
GlobalScope.launch {
val receiveChannel = broadcastChannel.openSubscription()
for (i in receiveChannel) {
println("#$index received #$i")
}
}
}.joinAll()
//print result
// #0 received #0
// #1 received #0
// #2 received #0
// #0 received #1
// #2 received #1
// #1 received #1
// #0 received #2
// #1 received #2
// #2 received #2
}
五、channel的多路复用
多路复用select函数,返回代码块中响应最快的一个结果
//test for onWait
fun testSelectOnWait() = runBlocking<Unit> {
val localUser = async {
delay(100)
User("Bob", 25)
}
val remoteUser = async {
delay(200)
User("Jack", 28)
}
GlobalScope.launch {
val responseUser = select<Response<User>> {
localUser.onAwait { Response<User>(it, true) }
remoteUser.onAwait { Response<User>(it, false) }
}
responseUser.value?.let {
println("response $it")
}
}.join()
}
//print result
//response User(name=Bob, age=25)
复用多个Channel
//test onReceive
fun testSelectChannel() = runBlocking<Unit> {
val channels = listOf<Channel<Int>>(Channel<Int> {}, Channel<Int> { })
GlobalScope.launch {
delay(50)
channels[0].send(1)
}
GlobalScope.launch {
delay(100)
channels[1].send(2)
}
val result = select<Int?> {
channels.forEach { channel ->
channel.onReceive {
it
}
}
}
println(result)
delay(1000)
}
//print result
// 1
六、SelectClause与select
如果我想确认挂起函数是否支持select,只需要查看其是否存在对应的SelectCluseN类型可调用即可。
- SelectClause0:对应事件没有返回值,例如join没有返回值,那么onJoin就是selelctClause0类型。使用时,onJoin的参数时一个无参函数。
@Test
fun testSelectClause0() = runBlocking {
val job1 = GlobalScope.launch {
delay(100)
println("Job1 Done")
}
val job2 = GlobalScope.launch {
delay(50)
println("Job2 Done")
}
select<Unit> {
job1.onJoin
job2.onJoin
}
joinAll(job1, job2)
}
//print result
//Job2 Done
//job2 onJoin
//Job1 Done
- SelectClasue1:对应事件有返回值,onAwait和onReceive都是此类情况
fun testSelectClause1() = runBlocking {
val job1 = GlobalScope.async {
delay(100)
"Job1 Done"
}
val job2 = GlobalScope.async {
delay(50)
"Job2 Done"
}
val result = select<String> {
job1.onAwait {
"onWait $it"
}
job2.onAwait { "onWait $it"
}
}
println(result)
delay(1000)
}
//print result
//onWait Job2 Done
- SelectClause2:对n应事件有返回值,此外还要一个额外的参数,例如,Channel.onSend有两个参数,第一个是Channel数据类型的值,表示即将发送的值,第二个是发送成功时的回调参数。
fun testSelectClause2() = runBlocking {
val channels = listOf<Channel<Int>>(Channel<Int>(), Channel<Int>())
println(channels)
launch(Dispatchers.IO) {
select<Unit?> {
launch {
delay(100)
channels[1].onSend(10) { sendChannel ->
println("sent on $sendChannel")
}
}
launch {
delay(200)
channels[0].onSend(20) { sendChannel ->
println("sent on $sendChannel")
}
}
}
}
GlobalScope.launch {
println("${channels[0].receive()}")
}
GlobalScope.launch {
println("${channels[1].receive()}")
}
delay(100)
}
七、Flow的多路复用
八、协程的并发安全
fun testUnSafe() = runBlocking<Unit> {
var count = 0
List(1000) {
GlobalScope.launch {
count++
}
}.joinAll()
println(count)
}
//print result
//963
//正常结果应该是1000,但是因为没有实现并发安全,造成了结果的错误
协程框架提供了一些并发安全的工具:
- Channel : 并发安全的消息通道。
- Mutex: 轻量级锁,它的lock和unlock从语义上与线程锁类似,之所以轻量是它获取不到锁时不会阻塞线程,而是挂起等待锁的释放。
- Semaphore:轻量级信号量,信号量可以有多个,协程在获取到信号量后即可执行并发操作。当Semaphore的参数为1时等同于Mutex。
fun testSafeMutex() = runBlocking<Unit> {
var count = 0
val mutex = Mutex()
List(1000) {
GlobalScope.launch {
mutex.withLock {
count++
}
}
}.joinAll()
println(count)
}
//semaphore
fun testSafeSeMaphore() = runBlocking<Unit> {
var count = 0
val semaphore = Semaphore(1)
List(1000) {
GlobalScope.launch {
semaphore.withPermit {
count++
}
}
}.joinAll()
println(count)
}