3.协程的调度(2)

协程上下文源代码

public interface CoroutineContext {
    //从该上下文返回具有给定[键]的元素或' null '
    public operator fun <E : Element> get(key: Key<E>): E?
    //从左到右添加元素
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R
    //删除此上下文
    public operator fun plus(context: CoroutineContext): CoroutineContext =
    if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
        context.fold(this) { acc, element ->
            val removed = acc.minusKey(element.key)
            if (removed === EmptyCoroutineContext) element else {
                // make sure interceptor is always last in the context (and thus is fast to get when present)
                val interceptor = removed[ContinuationInterceptor]
                if (interceptor == null) CombinedContext(removed, element) else {
                    val left = removed.minusKey(ContinuationInterceptor)
                    if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                        CombinedContext(CombinedContext(left, element), interceptor)
                }
            }
        }
    //返回包含来自此上下文的元素的上下文
    public fun minusKey(key: Key<*>): CoroutineContext
    //上下文元素的键
    public interface Key<E : Element>
    //上下文的一个元素,本身就是一个单例的协程上下文
    public interface Element : CoroutineContext {
    /**
     * A key of this coroutine context element.
     */
    public val key: Key<*>

    public override operator fun <E : Element> get(key: Key<E>): E? =
        @Suppress("UNCHECKED_CAST")
        if (this.key == key) this as E else null

    public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
        operation(initial, this)

    public override fun minusKey(key: Key<*>): CoroutineContext =
        if (this.key == key) EmptyCoroutineContext else this
    }
}

通过源码我们可以看出协程上下文是一个跟list类似的数据结构
CoroutineContext 是元素Element的集合,每一个Element都有一个key,同时Element 又实现了CoroutineContext 的接口,它自身也是一个协程的上下文,因此也可以作为集合出现。
协程上下文关键的几个子类


clipboard.png

1.协程拦截器

refrofit接口定义

suspend fun getMessage3(@Query("city") city: String): WeatherEntity

转化java代码

Object getMessage3(@Query("city") @NotNull String var1, @NotNull Continuation var2);

我们大胆猜测协程的本质就是回调 + “黑魔法
如何查看Continuation在线程调度过程中做了些什么,这时候就要利用拦截器
调度器就是基于拦截器实现的

public interface ContinuationInterceptor : CoroutineContext.Element {
    //上下文拦截器的键
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    //拦截操作,Continuation很重要
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    //释放拦截器
    public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
        /* do nothing by default */
    }

    // 重写get方法
    public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? =
        @Suppress("UNCHECKED_CAST")
        if (key === Key) this as E else null

    // 重写minusKey方法
    public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext =
        if (key === Key) EmptyCoroutineContext else this
}
Continuation的源码
public interface Continuation<in T> {
    /**
     * 与此Continuation相对应的协程上下文
     */
    public val context: CoroutineContext

    /**
     * 继续执行相应的协程,将一个成功或失败的[result]作为最后一个挂起点的返回值
     */
    public fun resumeWith(result: Result<T>)
}

我们可以自定义拦截器,看一下具体调用

class MyContinuation<T>(private val continuation: Continuation<T>) :Continuation<T>{
    override val context=continuation.context
    override fun resumeWith(result: Result<T>) {
        log("MyContinuation:$result")
        continuation.resumeWith(result)
    }
}
class MyContinuationInterceptor: ContinuationInterceptor {
    override val key=ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =MyContinuation(continuation)
}
suspend fun main(){
    GlobalScope.launch(MyContinuationInterceptor()) {
        log(1)
        val async = async {
            log(2)
            delay(1000)
            log(3)
            "hello"
        }
        log(4)
        val result = async.await()
        log("5---$result")
    }.join()
    log(6)
}

打印结果

22:25:29:804 [main] MyContinuation:Success(kotlin.Unit)   //①
22:25:29:819 [main] 1
22:25:29:850 [main] MyContinuation:Success(kotlin.Unit)  //②
22:25:29:850 [main] 2
22:25:29:897 [main] 4
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] MyContinuation:Success(kotlin.Unit)//③
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] 3
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] MyContinuation:Success(hello)//④
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] 5---hello
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] 6

打印结果分析
所有协程启动都会有一次resumeWith,所以打印①,Result<T>此时为Success(kotlin.Unit)
由于是join,所以打印1
async 又启动一个协程,所以打印②Result<T>,Result<T>此时为Success(kotlin.Unit)
打印2,delay函数是挂起函数
打印4
delay函数的挂起函数恢复继续,打印③,Result<T>此时为Success(kotlin.Unit)
打印3
async.await()是挂起函数,打印④,Result<T>此时为Success(hello)
打印5---hello
打印6
思考为什么从③处开始线程切换

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}

2.协程调度器

public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
      //如果需要执行diapatch方法,返回true
      public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
      //由给定的上下文执行Runnable代码块在另外的线程中,此方法不会立即执行
      public abstract fun dispatch(context: CoroutineContext, block: Runnable)
      @InternalCoroutinesApi
      public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
      public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =DispatchedContinuation(this, continuation)
      @InternalCoroutinesApi
      public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
      (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
     }
     public operator fun plus(other: CoroutineDispatcher) = other
     override fun toString(): String = "$classSimpleName@$hexAddress"
}

它本身是协程上下文的子类,同时实现了拦截器的接口, dispatch 方法会在拦截器的方法 interceptContinuation 中调用,进而实现协程的调度
kotlin在Android中提供以下四种CoroutineDispatcher

public actual object Dispatchers {
    @JvmStatic
    //协程默认的调度器,是线程池,默认等于cpu内核的数量,最少两个
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
    @JvmStatic
    //Android当中的主线程即ui线程
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
    @JvmStatic
    //无指定派发线程,会根据运行时的上线文环境决定
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
    @JvmStatic
    //用于执行阻塞线程的IO线程池,默认限制为64或者cpu内核数量(取最大)
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}

也可以自定义CoroutineDispatcher

suspend fun main(){
    val dispatcher =
        Executors.newSingleThreadExecutor { r -> Thread(r, "MyThread") }.asCoroutineDispatcher()
    GlobalScope.launch(dispatcher) {
        log(1)
    }.join()
    log(2)
    //由于这个线程池是我们自己创建的,因此我们需要在合适的时候关闭它
    dispatcher.close()
}

协程如果运行在多线程中一样会有两个问题
1.线程切换的开销问题
2.多线程安全问题
举例多线程开销问题

suspend fun main(){
    val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
    GlobalScope.launch (dispatcher){
        log(1)
        val async = async {
            log(2)
            delay(1000)
            log(3)
            "hello"
        }
        log(4)
        val result = async.await()
        log("5$result")
    }.join()
    log(6)
}

打印

21:52:54:234 [pool-1-thread-1] 1
21:52:54:287 [pool-1-thread-1] 4
21:52:54:313 [pool-1-thread-2] 2
21:52:55:339 [pool-1-thread-3] 3
21:52:55:352 [pool-1-thread-4] 5hello
21:52:55:352 [pool-1-thread-4] 6

线程切了四次,挂起函数的继续操作都会切换线程
所以我们在实际开发中要根据具体情况选用合适的CoroutineDispatcher
举例多线程安全问题

suspend fun main(){
    val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
    var i=0
    dispatcher.use {
        List(1000000){
            GlobalScope.launch(dispatcher) {
                i++
            }
        }.forEach {
            it.join()
        }
    }
    log(i)
    dispatcher.close()
}

打印
22:14:56:583 [main] 999881
解决方案1 单线程的CoroutineDispatcher操作数据(其他逻辑可以放在多线程,数据操作放在单线程比如此处的i++)
解决方案2 使用kotlin中线程安全的数据结构

suspend fun main(){
    val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
    var i= AtomicInteger()
    dispatcher.use {
        List(1000000){
            GlobalScope.launch(dispatcher) {
                i.incrementAndGet()
            }
        }.forEach {
            it.join()
        }
    }
    log(i)
    dispatcher.close()
}

与此相关的数据结构


clipboard.png

解决方案3 利用互斥锁

suspend fun main(){
    val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
    var i=0
    val mutex = Mutex()
    dispatcher.use {
        List(1000000){
            GlobalScope.launch(dispatcher) {
                // 用锁保护每次自增
                mutex.withLock {
                    i++
                }
            }
        }.forEach {
            it.join()
        }
    }
    log(i)
    dispatcher.close()
}

解决方案4 协程方式处理线程安全的actor,actor 在高负载下比锁更有效

// 计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求
// 这个函数启动一个新的计数器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor 状态
    for (msg in channel) { // 即将到来消息的迭代器
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}
suspend fun main(){
    val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
    var scope= CoroutineScope(EmptyCoroutineContext)
    scope.launch {
        val counter = counterActor() // 创建该 actor
        dispatcher.use {
            List(1000000){
                GlobalScope.launch(dispatcher) {
                    counter.send(IncCounter)
                }
            }.forEach {
                it.join()
            }
        }
        // 发送一条消息以用来从一个 actor 中获取计数值
        val response = CompletableDeferred<Int>()
        counter.send(GetCounter(response))
        log("Counter = ${response.await()}")
        counter.close() // 关闭该actor
        dispatcher.close()
    }.join()
}

第四种稍微偏难,了解即可,本质还是用到kotlin协程里面的SendChannel

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352