Kotlin Coroutines(协程) 完全解析系列:

Kotlin Coroutines(协程) 完全解析(一),协程简介

Kotlin Coroutines(协程) 完全解析(二),深入理解协程的挂起、恢复与调度

Kotlin Coroutines(协程) 完全解析(三),封装异步回调、协程间关系及协程的取消

Kotlin Coroutines(协程) 完全解析(四),协程的异常处理

Kotlin Coroutines(协程) 完全解析(五),协程的并发

本文基于 Kotlin v1.3.0-rc-146,Kotlin-Coroutines v1.0.0-RC1


1. 挂起函数的工作原理

协程的内部实现使用了 Kotlin 编译器的一些编译技术,当挂起函数调用时,背后大致细节如下:

挂起函数或挂起 lambda 表达式调用时,都有一个隐式的参数额外传入,这个参数是Continuation类型,封装了协程恢复后的执行的代码逻辑。


suspend fun requestToken(): Token { ... }

实际上在 JVM 中更像下面这样:

Object requestToken(Continuation<Token> cont) { ... }


 * Interface representing a continuation after a suspension point that returns value of type `T`.
public interface Continuation<in T> {
     * Context of the coroutine that corresponds to this continuation.
    public val context: CoroutineContext

     * Resumes the execution of the corresponding coroutine passing successful or failed [result] as the
     * return value of the last suspension point.
    public fun resumeWith(result: Result<T>)


suspend fun requestToken(): Token { ... }   // 挂起函数
suspend fun createPost(token: Token, item: Item): Post { ... }  // 挂起函数
fun processPost(post: Post) { ... }

fun postItem(item: Item) {
    GlobalScope.launch {
        val token = requestToken()
        val post = createPost(token, item)

然而,协程内部实现不是使用普通回调的形式,而是使用状态机来处理不同的挂起点,大致的 CPS(Continuation Passing Style) 代码为:

// 编译后生成的内部类大致如下
final class postItem$1 extends SuspendLambda ... {
    public final Object invokeSuspend(Object result) {
        switch (this.label) {
            case 0:
                this.label = 1;
                token = requestToken(this)
            case 1:
                this.label = 2;
                Token token = result;
                post = createPost(token, this.item, this)
            case 2:
                Post post = result;

上面代码中每一个挂起点和初始挂起点对应的 Continuation 都会转化为一种状态,协程恢复只是跳转到下一种状态中。挂起函数将执行过程分为多个 Continuation 片段,并且利用状态机的方式保证各个片段是顺序执行的。


1.1 挂起函数可能会挂起协程

挂起函数使用 CPS style 的代码来挂起协程,保证挂起点后面的代码只能在挂起函数执行完后才能执行,所以挂起函数保证了协程内的顺序执行顺序。


fun postItem(item: Item) {
    GlobalScope.launch {
        // async { requestToken() } 新建一个协程,可能在另一个线程运行
        // 但是 await() 是挂起函数,当前协程执行逻辑卡在第一个分支,第一种状态,当 async 的协程执行完后恢复当前协程,才会切换到下一个分支
        val token = async { requestToken() }.await()
        // 在第二个分支状态中,又新建一个协程,使用 await 挂起函数将之后代码作为 Continuation 放倒下一个分支状态,直到 async 协程执行完
        val post = aync { createPost(token, item) }.await()
        // 最后一个分支状态,直接在当前协程处理


注意挂起函数不一定会挂起协程,如果相关调用的结果已经可用,库可以决定继续进行而不挂起,例如async { requestToken() }的返回值Deferred的结果已经可用时,await()挂起函数可以直接返回结果,不用再挂起协程。

1.2 挂起函数不会阻塞线程



fun main(args: Array<String>) {
    // 创建一个单线程的协程调度器,下面两个协程都运行在这同一线程上
    val coroutineDispatcher = newSingleThreadContext("ctx")
    // 启动协程 1
    GlobalScope.launch(coroutineDispatcher) {
        println("the first coroutine")
        println("the first coroutine")
    // 启动协程 2
    GlobalScope.launch(coroutineDispatcher) {
        println("the second coroutine")
        println("the second coroutine")
    // 保证 main 线程存活,确保上面两个协程运行完成


the first coroutine
the second coroutine
the second coroutine
the first coroutine

从上面结果可以看出,当协程 1 暂停 200 ms 时,线程并没有阻塞,而是执行协程 2 的代码,然后在 200 ms 时间到后,继续执行协程 1 的逻辑。所以挂起函数并不会阻塞线程,这样可以节省线程资源,协程挂起时,线程可以继续执行其他逻辑。

1.3 挂起函数恢复协程后运行在哪个线程

协程的所属的线程调度在前一篇文章《协程简介》中有提到过,主要是由协程的CoroutineDispatcher控制,CoroutineDispatcher可以指定协程运行在某一特定线程上、运作在线程池中或者不指定所运行的线程。所以协程调度器可以分为Confined dispatcherUnconfined dispatcherDispatchers.DefaultDispatchers.IODispatchers.Main属于Confined dispatcher,都指定了协程所运行的线程或线程池,挂起函数恢复后协程也是运行在指定的线程或线程池上的,而Dispatchers.Unconfined属于Unconfined dispatcher,协程启动并运行在 Caller Thread 上,但是只是在第一个挂起点之前是这样的,挂起恢复后运行在哪个线程完全由所调用的挂起函数决定。

fun main(args: Array<String>) = runBlocking<Unit> {
    launch { // 默认继承 parent coroutine 的 CoroutineDispatcher,指定运行在 main 线程
        println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
        println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
    launch(Dispatchers.Unconfined) {
        println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
        println("Unconfined      : After delay in thread ${Thread.currentThread().name}")


Unconfined      : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main


2. 协程深入解析

上面更多地是通过 demo 的方式说明挂起函数函数的一些特性,但是协程的创建、启动、恢复、线程调度、协程切换是如何实现的呢,还是不清楚,下面结合源码详细地解析协程。

2.1 协程的创建与启动

先从新建一个协程开始分析协程的创建,最常见的协程创建方式为CoroutineScope.launch {},关键源码如下:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    coroutine.start(start, coroutine, block)
    return coroutine

coroutine.start(start, coroutine, block)默认情况下会走到startCoroutineCancellable,最终会调用到createCoroutineUnintercepted

 * Creates unintercepted coroutine without receiver and with result type [T].
 * This function creates a new, fresh instance of suspendable computation every time it is invoked.
 * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
 * The [completion] continuation is invoked when coroutine completes with result or exception.
 public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> { ... }


再看之前协程代码编译生成的内部类final class postItem$1 extends SuspendLambda ...,协程的计算逻辑封装在invokeSuspend方法中,而SuspendLambda的继承关系为 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation,其中BaseContinuationImpl 部分关键源码如下:

internal abstract class BaseContinuationImpl(...) {
    // 实现 Continuation 的 resumeWith,并且是 final 的,不可被重写
    public final override fun resumeWith(result: Result<Any?>) {
        val outcome = invokeSuspend(param)

    // 由编译生成的协程相关类来实现,例如 postItem$1
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?


2.2 协程的线程调度


internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
// createCoroutineUnintercepted(completion) 会创建一个新的协程,返回值类型为 Continuation
// intercepted() 是给 Continuation 加上 ContinuationInterceptor 拦截器,也是线程调度的关键
// resumeCancellable(Unit) 最终将调用 resume(Unit) 启动协程


public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this
// ContinuationImpl 是 SuspendLambda 的父类

internal abstract class ContinuationImpl(...) : BaseContinuationImpl(completion) {
    private var intercepted: Continuation<Any?>? = null

    public fun intercepted(): Continuation<Any?> =
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    // intercepted() 方法关键是 context[ContinuationInterceptor]?.interceptContinuation(this)
    // context[ContinuationInterceptor] 就是协程的 CoroutineDispatcher

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
     * Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)

所以intercepted()最终会使用协程的CoroutineDispatcherinterceptContinuation方法包装原来的 Continuation,拦截所有的协程运行操作。


internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : Continuation<T> by continuation, DispatchedTask<T> {
    inline fun resumeCancellable(value: T) {
        // 判断是否需要线程调度
        if (dispatcher.isDispatchNeeded(context)) {
            // 将协程的运算分发到另一个线程
            dispatcher.dispatch(context, this)
        } else {
            // 如果不需要调度,直接在当前线程执行协程运算

    override fun resumeWith(result: Result<T>) {
        // 判断是否需要线程调度
        if (dispatcher.isDispatchNeeded(context)) {
            // 将协程的运算分发到另一个线程
            dispatcher.dispatch(context, this)
        } else {
            // 如果不需要调度,直接在当前线程执行协程运算

internal interface DispatchedTask<in T> : Runnable {
    public override fun run() {
        // 封装了 continuation.resume 逻辑


2.3 协程的挂起和恢复

Kotlin 编译器会生成继承自SuspendLambda的子类,协程的真正运算逻辑都在invokeSuspend中。但是协程挂起的具体实现是如何呢?先看下面示例代码:

fun main(args: Array<String>) = runBlocking<Unit> { // 新建并启动 blocking 协程,运行在 main 线程上,等待所有子协程运行完成后才会结束
    launch(Dispatchers.Unconfined) { // 新建并启动 launch 协程,没有指定所运行线程,一开始运行在调用者所在的 main 线程上
        println("${Thread.currentThread().name} : launch start")
        async(Dispatchers.Default) { // 新建并启动 async 协程,运行在 Dispatchers.Default 的线程池中
            println("${Thread.currentThread().name} : async start")
            delay(100)  // 挂起 async 协程 100 ms
            println("${Thread.currentThread().name} : async end")
        }.await() // 挂起 launch 协程,直到 async 协程结束
        println("${Thread.currentThread().name} : launch end")

其中 launch 协程编译生成的 SuspendLambda 子类的invokeSuspend方法如下:

public final Object invokeSuspend(@NotNull Object result) {
    Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    switch (this.label) {
        case 0:
            System.out.println(stringBuilder.append(currentThread.getName()).append(" : launch start").toString());
            // 新建并启动 async 协程 
            Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);
            this.label = 1;
            // 调用 await() 挂起函数
            if (async$default.await(this) == coroutine_suspended) {
                return coroutine_suspended;
        case 1:
            if (result instanceof Failure) {
                throw ((Failure) result).exception;
            // 恢复协程后再执行一次 resumeWith(),然后无异常的话执行最后的 println()
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    System.out.println(stringBuilder2.append(currentThread2.getName()).append(" : launch end").toString());
    return Unit.INSTANCE;

上面代码中 launch 协程挂起的关键在于async$default.await(this) == coroutine_suspended,如果此时 async 线程未执行完成,await()返回为IntrinsicsKt.getCOROUTINE_SUSPENDED(),就会 return,launch 协程的invokeSuspend方法执行完成,协程所在线程继续往下运行,此时 launch 线程处于挂起状态。所以协程挂起就是协程挂起点之前逻辑执行完成,协程的运算关键方法resumeWith()执行完成,线程继续执行往下执行其他逻辑。


  • 启动其他协程并不会挂起当前协程,所以launchasync启动线程时,除非新协程运行在当前线程,则当前协程只能在新协程运行完成后继续执行,否则当前协程都会马上继续运行。

  • 协程挂起并不会阻塞线程,因为协程挂起时相当于执行完协程的方法,线程继续执行其他之后的逻辑。

  • 挂起函数并一定都会挂起协程,例如await()挂起函数如果返回值不等于IntrinsicsKt.getCOROUTINE_SUSPENDED(),则协程继续执行挂起点之后逻辑。


private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
        * Custom code here, so that parent coroutine that is using await
        * on its child deferred (async) coroutine would throw the exception that this child had
        * thrown and not a JobCancellationException.
    val cont = AwaitContinuation(uCont.intercepted(), this)
    invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)

private class ResumeAwaitOnCompletion<T>(
    job: JobSupport,
    private val continuation: AbstractContinuation<T>
) : JobNode<JobSupport>(job) {
    override fun invoke(cause: Throwable?) {
        val state = job.state
        check(state !is Incomplete)
        if (state is CompletedExceptionally) {
            // Resume with exception in atomic way to preserve exception
            continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
        } else {
            // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
            continuation.resume(state as T)
    override fun toString() = "ResumeAwaitOnCompletion[$continuation]"

上面源码中ResumeAwaitOnCompletioninvoke方法的逻辑就是调用continuation.resume(state as T)恢复协程。invokeOnCompletion函数里面是如何实现 async 协程完成后自动恢复之前协程的呢,源码实现有些复杂,因为很多边界情况处理就不全部展开,其中最关键的逻辑如下:

// handler 就是 ResumeAwaitOnCompletion 的实例,将 handler 作为节点
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
// 将 node 节点添加到 state.list 中
if (!addLastAtomic(state, list, node)) return@loopOnState // retry

接下来我断点调试 launch 协程恢复的过程,从 async 协程的SuspendLambda的子类的completion.resumeWith(outcome) -> AbstractCoroutine.resumeWith(result) ..-> JobSupport.tryFinalizeSimpleState() -> JobSupport.completeStateFinalization() -> state.list?.notifyCompletion(cause) -> node.invoke,最后 handler 节点里面通过调用resume(result)恢复协程。

所以await()挂起函数恢复协程的原理是,将 launch 协程封装为 ResumeAwaitOnCompletion 作为 handler 节点添加到 aynsc 协程的 state.list,然后在 async 协程完成时会通知 handler 节点调用 launch 协程的 resume(result) 方法将结果传给 launch 协程,并恢复 launch 协程继续执行 await 挂起点之后的逻辑。

而这过程中有两个finalresumeWith 方法,一个是SuspendLambda的父类BaseContinuationImpl的,我们再来详细分析一篇:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        var param = result
        while (true) {
            with(current) {
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        // 调用 invokeSuspend 方法执行,执行协程的真正运算逻辑
                        val outcome = invokeSuspend(param)
                        // 协程挂起时 invokeSuspend 才会返回 COROUTINE_SUSPENDED,所以协程挂起时,其实只是协程的 resumeWith 运行逻辑执行完成,再次调用 resumeWith 时,协程挂起点之后的逻辑才能继续执行
                        if (outcome === COROUTINE_SUSPENDED) return
                    } catch (exception: Throwable) {
                releaseIntercepted() // this state machine instance is terminating
                // 这里可以看出 Continuation 其实分为两类,一种是 BaseContinuationImpl,封装了协程的真正运算逻辑
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // 断点时发现 completion 是 DeferredCoroutine 实例,这里实际调用的是其父类 AbstractCoroutine 的 resumeWith 方法

接下来再来看另外一类 Continuation,AbstractCoroutine 的resumeWith实现:

public abstract class AbstractCoroutine<in T>(
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
     * Completes execution of this with coroutine with the specified result.
    public final override fun resumeWith(result: Result<T>) {
        // makeCompletingOnce 大致实现是修改协程状态,如果需要的话还会将结果返回给调用者协程,并恢复调用者协程
        makeCompletingOnce(result.toState(), defaultResumeMode)

所以其中一类 Continuation BaseContinuationImplresumeWith封装了协程的运算逻辑,用以协程的启动和恢复;而另一类 Continuation AbstractCoroutine,主要是负责维护协程的状态和管理,它的resumeWith则是完成协程,恢复调用者协程。

2.4 协程的三层包装




3. 小结


