探究 iOS 协程 - coobjc 源码分析(二)

目录

探究 iOS 协程 - 协程介绍与使用(一)
探究 iOS 协程 - coobjc 源码分析(二)

上一篇讲完了协程的概念与使用方式,这一篇我们来分析一下阿里开源协程框架 coobjc 源码。首先我们先写一个最简单的示例程序:

- (void)testCORoutineAsyncFunc {
    co_launch(^{
        NSLog(@"co start");
        // await 后面需要跟 COChan 或者 COPromise
        NSNumber *num = await([self promiseWithNumber:@(1)]);
        NSLog(@"co finish");
    });
    NSLog(@"main");
}

// COPromise 模拟了一个异步任务
- (COPromise *)promiseWithNumber:(NSNumber *)number {
    COPromise *promise = [COPromise promise:^(COPromiseFulfill  _Nonnull fullfill, COPromiseReject  _Nonnull reject) {
        dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(3 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
            fullfill(number);
//            reject(error);  // 如果有错误,回调到上层
        });
    } onQueue:dispatch_get_global_queue(0, 0)];
    return promise;
}

以上的代码会输出:

main
co start
co finish

co_launch 这里就在主线程开启了一个协程,现在大家应该特别好奇 await 为什么可以等待异步任务完成?别着急,我们慢慢往下看。

创建协程

首先我们来看一下 co_launch 做了什么事:

/**
 Create a coroutine, then resume it asynchronous on current queue.

 @param block the code execute in the coroutine
 @return the coroutine instance
 */
NS_INLINE COCoroutine * _Nonnull  co_launch(void(^ _Nonnull block)(void)) {
    // 创建协程
    COCoroutine *co = [COCoroutine coroutineWithBlock:block onQueue:nil];
    // 开启协程
    return [co resume];
}

co_launch 主要做了两件事:

  1. 创建协程,把协程需要执行的 block 作为参数传进去。co_launch默认会在当前线程创建协程。
  2. 启动协程。

我们具体来看看如何创建协程:

- (instancetype)initWithBlock:(void (^)(void))block onQueue:(dispatch_queue_t)queue stackSize:(NSUInteger)stackSize {
    self = [super init];
    if (self) {
        // 协程需要执行的 block 赋予属性 execBlock
        _execBlock = [block copy];
        _dispatch = queue ? [CODispatch dispatchWithQueue:queue] : [CODispatch currentDispatch];
        // 真正创建协程的方法。真正的协程是 coroutine_t 结构体类型,COCoroutine 只是在 OC 层面的一层封装
        coroutine_t  *co = coroutine_create((void (*)(void *))co_exec);
        // 指定栈空间
        if (stackSize > 0 && stackSize < 1024*1024) {   // Max 1M
            co->stack_size = (uint32_t)((stackSize % 16384 > 0) ? ((stackSize/16384 + 1) * 16384) : stackSize);        // Align with 16kb
        }
        _co = co;
        // 让 coroutine_t 引用 COCoroutine,并设置销毁函数
        coroutine_setuserdata(co, (__bridge_retained void *)self, co_obj_dispose);
    }
    return self;
}

上面贴出了创建协程的关键方法,相关的步骤已经给出了注释,我们具体来看coroutine_create:

coroutine_t *coroutine_create(coroutine_func func) {
    coroutine_t *co = calloc(1, sizeof(coroutine_t));
    co->entry = func;
    co->stack_size = STACK_SIZE;
    co->status = COROUTINE_READY;
    
    // check debugger is attached, fix queue debugging.
    co_rebind_backtrace();
    return co;
}

co_rebind_backtrace 这里先忽略。这个方法很简单,就是创建一个 coroutine_t 结构体,把之前调用者传入的 co_exec 赋值给 entry 属性。这里的 co_exec 是一个函数,下面我们来看看这个函数的具体实现:

static void co_exec(coroutine_t  *co) {
    /* 通过 co_get_obj 拿到 COCoroutine 对象
     (之前在创建协程的时候通过 coroutine_setuserdata 把 COCoroutine 对象设置到了 coroutine_t 结构体中)。
       这里需要拿到 COCoroutine 的原因是因为协程真正执行的 block 是保存在 COCoroutine 对象中的
     */
    COCoroutine *coObj = co_get_obj(co);
    if (coObj) {
        // 执行之前保存的 execBlock
        [coObj execute];
        
        coObj.isFinished = YES;
        if (coObj.finishedBlock) {
            coObj.finishedBlock();
            coObj.finishedBlock = nil;
        }
        if (coObj.joinBlock) {
            coObj.joinBlock();
            coObj.joinBlock = nil;
        }
        //维护父子协程关系
        [coObj.parent removeChild:coObj];
    }
}

co_exec 主要做的事就是执行保存在 coroutine 上的 block。目前我们的协程就算创建完毕了。

启动协程

通过上面的分析可以看到,co_exec 是真正执行协程 block 的地方,那么 co_exec 是在什么时候开始执行的呢?回到最开始 co_launch 的地方。co_launch 之后,会立刻调用 [co resume],这里 resume 就是真正启动协程的地方,下面我们来看看 resume 具体实现:

- (COCoroutine *)resume {
    // 拿到当前真正运行的协程
    COCoroutine *currentCo = [COCoroutine currentCoroutine];
    // 判断是否是当前运行协程的子协程
    BOOL isSubroutine = [currentCo.dispatch isEqualToDipatch:self.dispatch] ? YES : NO;
    
    [self.dispatch dispatch_async_block:^{
        if (self.isResume) {
            return;
        }
        // 如果是子协程,设置一下父子关系
        if (isSubroutine) {
            self.parent = currentCo;
            [currentCo addChild:self];
        }
        self.isResume = YES;
        // 启动协程
        coroutine_resume(self.co);
    }];
    return self;
}

要注意,协程是异步追加到队列中的。如果没有特别指定队列,默认会追加到当前线程队列中
具体启动协程在 coroutine_resume,我们接着往里看:

void coroutine_resume(coroutine_t *co) {
    if (!co->is_scheduler) {
        // 拿到当前线程的协程调度器
        coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();
        co->scheduler = scheduler;
        // 把协程丢到 scheduler 维护的协程集合里(这里的集合是用双向链表实现)
        scheduler_queue_push(scheduler, co);
        // 如果当前线程有真正运行的协程,把该协程 yield 掉
        if (scheduler->running_coroutine) {
            // resume a sub coroutine.
            scheduler_queue_push(scheduler, scheduler->running_coroutine);
            coroutine_yield(scheduler->running_coroutine);
        } else {
            // scheduler is idle
            coroutine_resume_im(co->scheduler->main_coroutine);
        }
    }
}

在这里需要特别说明一下调度器这个概念。其实在上一篇文章有提到,实现协程的 resume 和 yield 需要一个调度器来控制。调度器每个线程独有一个,用来调度该线程下的所有协程。同一时间段每个线程下只有一个协程在 running 状态
下面的图很好的诠释了线程、调度器和协程的关系:

image.png

这里的调度器就类似于操作系统在线程调度时候发挥的作用。为什么说协程是一种用户态的线程,看到这里想必对这个概念也有了更深刻的理解。
下面我们通过代码来具体看看调度器是如何创建的。大家还记得上面在 coroutine_resume 方法内部调用了 coroutine_scheduler_self_create_if_not_exists吗,我们来看看这个方法具体实现:

coroutine_scheduler_t *coroutine_scheduler_self_create_if_not_exists(void) {
    
    if (!coroutine_scheduler_key) {
        pthread_key_create(&coroutine_scheduler_key, coroutine_scheduler_free);
    }
    
    void *schedule = pthread_getspecific(coroutine_scheduler_key);
    if (!schedule) {
        schedule = coroutine_scheduler_new();
        pthread_setspecific(coroutine_scheduler_key, schedule);
    }
    return schedule;
}

可以看到调度器是被存在了 TSD 里,每个线程有且仅有一个,这也就更好的诠释了上面那张图片。
说完了调度器,下面我们再回到协程启动上来。我们当前线程只创建了一个协程,所以不存在 running_coroutine,那么协程启动最终会调用到 coroutine_resume_im 来,这个函数有点长,我只截取了启动相关的部分:

void coroutine_resume_im(coroutine_t *co) {
    switch (co->status) {
        case COROUTINE_READY:
        {
            // 分配虚拟内存到 stack_memory
            co->stack_memory = coroutine_memory_malloc(co->stack_size);
            // 根据虚拟内存地址计算栈顶指针地址
            co->stack_top = co->stack_memory + co->stack_size - 3 * sizeof(void *);
            // get the pre context
            // 在堆上开辟一块内存,随后调用 coroutine_getcontext 把当前函数调用栈存入 pre_context。
            co->pre_context = malloc(sizeof(coroutine_ucontext_t));
            BOOL skip = false;
            // coroutine_getcontext 保存了当前函数调用栈,但最主要得是保存 lr 寄存器的地址(下一条指令地址)。
            coroutine_getcontext(co->pre_context);
            if (skip) {
                // when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.
                return;
            }
#pragma unused(skip)
            skip = true;
            
            free(co->context);
            co->context = calloc(1, sizeof(coroutine_ucontext_t));
            // 通过 coroutine_makecontext 生成一个协程上下文,跟 coroutine_getcontext 类似,只不过这里是直接用结构体模拟的。
            coroutine_makecontext(co->context, (IMP)coroutine_main, co, (void *)co->stack_top);
            // setcontext
            // 真正开启协程的函数,这里一执行,就会调用到 coroutine_main 这个函数里。
            coroutine_begin(co->context);
            
            break;
        }
        .........
}

coroutine_resume_im 主要做了三件事:

  1. 把当前的函数栈保存在 co->pre_context 中(其实就是保存 lr)。
  2. 生成一个新的 context 保存在 co->context 中。
  3. 开始执行 co->context 中保存的函数(coroutine_main)。

coroutine_getcontextcoroutine_makecontextcoroutine_begin 等被称为协程族函数,具体实现细节会在后一篇文章讨论,这里只需要知道它们的作用就可以。
现在我们知道,协程本身会保存 pre_context 和新建一个 context,这里也引申出来一个问题:为什么要保存 pre_context?原因是当我们的协程执行完之后,还需要回到我们想回去的地方。我在哪里设置了 pre_context,那当我协程执行完之后就可以通过 coroutine_setcontext 回到我当初设置 pre_context 的地方。
到这里大家也不难想象协程是怎么实现异步的同步化表达。在传统的 block 异步编程中,其实是把异步操作执行完需要回调的函数地址保存在 block 对象内部,然后通过 block 对象调用这个函数:

image.png

那么对于协程来说,它通过 coroutine 对象内部保存了当前函数调用栈,当异步执行完之后,取出保存的函数调用栈开始执行原来的函数。
image.png

刚才说到在调用 coroutine_begin 之后会真正开始执行 coroutine_main,我们一起来看看这个函数的实现:

static void coroutine_main(coroutine_t *co) {
    co->status = COROUTINE_RUNNING;
    // 执行协程中保存的 block
    co->entry(co);
    co->status = COROUTINE_DEAD;
    // 执行完毕,回到保存函数栈的地方
    coroutine_setcontext(co->pre_context);
}

重点看一下 co->entry(co) ,还记得一开始我们在创建协程的时候赋值给 co->entry 的函数吗?不清楚的可以回到文章一开始的地方看一下。那么在 coroutine_main 函数调用的时候就真正执行了保存在 co->entry 里的 co_exec 函数,这个函数里会调用保存在 COCoroutine 对象上的 execBlock,也就是我们文章一开始例子中 co_launch 的 block 参数。

中断协程

现在,我们的协程已经顺利启动起来了。然后碰到了 await 函数,当前协程会暂停等待 await 之后的异步操作来唤醒,那么我们一起来看看这个函数做了什么:

/**
 await
 
 @param _promiseOrChan the COPromise object, you can also pass a COChan object.
 But we suggest use Promise first.
 @return return the value, nullable. after, you can use co_getError() method to get the error.
 */
NS_INLINE id _Nullable await(id _Nonnull _promiseOrChan) {
    id val = co_await(_promiseOrChan);
    return val;
}

await 函数很简单,就是调用了 co_await,并把返回值返回了出去。我们真正需要看的是 co_await 这个核心函数:

id co_await(id awaitable) {
    coroutine_t  *t = coroutine_self();
    if (t == nil) {
        @throw [NSException exceptionWithName:COInvalidException reason:@"Cannot call co_await out of a coroutine" userInfo:nil];
    }
    if (t->is_cancelled) {
        return nil;
    }
    
    if ([awaitable isKindOfClass:[COChan class]]) {
        COCoroutine *co = co_get_obj(t);
        co.lastError = nil;
        // 内部会调用 yield 中断当前协程
        id val = [(COChan *)awaitable receive];
        return val;
    } else if ([awaitable isKindOfClass:[COPromise class]]) {
        // 创建 cochan
        COChan *chan = [COChan chanWithBuffCount:1];
        COCoroutine *co = co_get_obj(t);
        
        co.lastError = nil;
        
        COPromise *promise = awaitable;
        [[promise
          then:^id _Nullable(id  _Nullable value) {
              // 当有回调过来,调用 resume 恢复协程中断
              [chan send_nonblock:value];
              return value;
          }]
         catch:^(NSError * _Nonnull error) {
             co.lastError = error;
             [chan send_nonblock:nil];
         }];
        // 内部会调用 yield 中断当前协程
        id val = [chan receiveWithOnCancel:^(COChan * _Nonnull chan) {
            [promise cancel];
        }];
        return val;
        
    } else {
        @throw [NSException exceptionWithName:COInvalidException
                                       reason:[NSString stringWithFormat:@"Cannot await object: %@.", awaitable]
                                     userInfo:nil];
    }
}

COChan内部实现

上一篇文章中我们有提到 COChan 这个概念和它的一些用法,如果不清楚的话可以再回过去看一下,这里就不再赘述。在 co_await 源码里可以看到,不管传进来的 awaitable 对象是 COChan 还是 COPromise ,最终都会调用 COChanreceive 方法中断当前协程,我们先一起来看看 COChan 是如何创建的:

- (instancetype)initWithBuffCount:(int32_t)buffCount {
    self = [super init];
    if (self) {
        _chan = chancreate(sizeof(int8_t), buffCount, co_chan_custom_resume);
        _buffList = [[NSMutableArray alloc] init];
        COOBJC_LOCK_INIT(_buffLock);
    }
    return self;
}

COChan 内部会创建一个 co_channel 结构体和一个 _buffList 数组。这里我们也可以看到,COChan 其实也是内部属性 co_channel 结构体的一层封装,真正核心逻辑还是 co_channel 在处理,下面我们一起来看看 chancreate 方法:

co_channel *chancreate(int elemsize, int bufsize, void (*custom_resume)(coroutine_t *co)) {
    // bufsize == 外面传进来的 buffCount
    co_channel *c;
    if (bufsize < 0) {
        // 没有 bufferCount 不需要额外存储空间
        c = calloc(1, sizeof(co_channel));
    } else {
        c = calloc(1, (sizeof(co_channel) + bufsize*elemsize));
    }
    
    // init buffer
    if (bufsize < 0) {
        queueinit(&c->buffer, elemsize, 16, 16, NULL);
    } else {
        // bufferCount >= 0 -> expandsize == 0
        queueinit(&c->buffer, elemsize, bufsize, 0, (void *)(c+1));
    }
    
    // init lock
    c->lock = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
    
    c->custom_resume = custom_resume;

    return c;
}

bufsize 是我们外面传进来的 buffCount,在 co_await 函数中,buffCount 的值是 1。当 bufsize > 0 的时候,会为 co_channel 结构体分配多余的内存空间。bufsize 这里代表缓冲区最大容量。
co_channel 分配完内存空间之后,会初始化 co_channel 中的 buffer 属性,该属性是一个 chan_queue 类型结构体:

static void queueinit(chan_queue *q, int elemsize, int bufsize, int expandsize, void *buf) {
    // bufsize >= 0, expandsize == 0; bufsize < 0, expandsize == 16
    q->elemsize = elemsize;
    q->size = bufsize;
    q->expandsize = expandsize;
    if (expandsize) {
        if (bufsize > 0) {
            // 为容器分配内存空间
            q->arr = malloc(bufsize * elemsize);
        }
    } else {
        if (buf) {
            // 这里的 buf 是 co_channel 里的 asend 结构体。 
            q->arr = buf;
        }
    }
}

创建 co_channel 主要就是初始化了内部的 buffer 属性,也就是缓冲区。其余的都比较简单。要注意在这里当外部传进来的 BuffCount >= 0 时,expandsize == 0,c->buffer->arr == c->asend。具体为什么要这样设计,我会在后面给出答案。
讲完了 COChan 的初始化,紧接着就会调用 [COChan receive],我们一起来看看 receive 内部做了什么。receive 最终都会调到 receiveWithOnCancel:

- (id)receiveWithOnCancel:(COChanOnCancelBlock)cancelBlock {
    
    ...
    
    IMP cancel_exec = NULL;
    if (cancelBlock) {
        cancel_exec = imp_implementationWithBlock(^{
            cancelBlock(self);
        });
    }
    
    uint8_t val = 0;
    int ret = chanrecv_custom_exec(_chan, &val, cancel_exec);
    if (cancel_exec) {
        imp_removeBlock(cancel_exec);
    }
    co.currentChan = nil;
    
    if (ret == CHANNEL_ALT_SUCCESS) {
        // success
        do {
            COOBJC_SCOPELOCK(_buffLock);
            NSMutableArray *buffList = self.buffList;
            if (buffList.count > 0) {
                id obj = buffList.firstObject;
                [buffList removeObjectAtIndex:0];
                if (obj == kCOChanNilObj) {
                    obj = nil;
                }
                return obj;
            } else {
                return nil;
            }

        } while(0);
        
    } else {
        // ret not 1, means nothing received or cancelled.
        return nil;
    }
}

省略了与主流程无关的代码,重点来关注 chanrecv_custom_exec

int chanrecv_custom_exec(co_channel *c, void *v, IMP cancelExec) {
    return _chanop2(c, CHANNEL_RECEIVE, v, 1, NULL, cancelExec);
}

最终调用了 _chanop2 ,主要关注 CHANNEL_RECEIVE 这个枚举:

typedef enum {
    CHANNEL_SEND = 1,
    CHANNEL_RECEIVE,
} channel_op;

CHANNEL_SEND 代表往 chan 里面发送消息,也就是调用 send 或者 send_nonblock
CHANNEL_RECEIVE 代表调用了 chan 的 receive 或者 receive_nonblock
接下来看一下 _chanop2

static int _chanop2(co_channel *c, int op, void *p, int canblock, IMP custom_exec, IMP cancel_exec) {
    chan_alt *a = malloc(sizeof(chan_alt));
    
    a->channel = c;
    a->op = op;
    a->value = p;
    // 应该是重复赋值了一次
    a->op = op;
    // 是否需要 yield 当前协程(如果是调用 nonblock 后缀的方法,canblock == 0)
    a->can_block = canblock;
    a->prev = NULL;
    a->next = NULL;
    a->is_cancelled = false;
    // send 的时候会传入 custom_exec
    a->custom_exec = custom_exec;
    a->cancel_exec = cancel_exec;
    
    int ret = chanalt(a);
    free(a);
    return ret;
}

这里主要就是创建 chan_alt 结构体,真正的核心逻辑在 chan_alt

int chanalt(chan_alt *a) {
    
    int canblock = a->can_block;
    co_channel *c;
    coroutine_t *t = coroutine_self();
    // task = coroutine_t
    a->task = t;
    c = a->channel;
    // 对 co_channel 加锁
    chanlock(c);
    // 判断是否需要执行 alt
    if(altcanexec(a)) {
        return altexec(a);
    }
    
    if(!canblock) {
        chanunlock(c);
        return a->op == CHANNEL_SEND ? CHANNEL_ALT_ERROR_BUFFER_FULL : CHANNEL_ALT_ERROR_NO_VALUE;
    }
    
    // add to queue
    altqueue(a);
    // set coroutine's chan_alt
    t->chan_alt = a;
    
    chanunlock(c);
    
    // blocking.
    coroutine_yield(t);
    // resume
    t->chan_alt = nil;
    // alt is cancelled
    if (a->is_cancelled) {
        return CHANNEL_ALT_ERROR_CANCELLED;
    }
    
    return CHANNEL_ALT_SUCCESS;
}

chan_alt 内部会首先判断该 chan_alt 是否能够执行,其次会判断是否是 block 类型的函数,在这里会出现这么几种执行路径:

  • 如果不能执行(缓冲区满了),并且调用的是 receive_nonblocksend_nonblock,那么会直接 return
  • 如果不能执行(缓冲区满了),并且调用的是 receivesend,那么会被 coroutine_yield 把当前协程中断。
  • 如果可以执行,那么会调用 altexec 并返回结果。

我们先来看一下 altcanexec 函数:

static int altcanexec(chan_alt *a) {
   alt_queue *altqueue;
   co_channel *c;
   
   c = a->channel;
   // buffer.size 是初始化 COChan 时传进去的 BuffCount,代表缓冲区的容量
   // buffer.count 是 buffer 里实际任务的数量
   if(c->buffer.size == 0){
       /**
           1.未设置 buffer.size 或者 buffer.size == 0 说明需要立即执行 chan 里的任务
           2.otherop 对 a->op 取反操作,然后会拿到与 op 相反操作的队列
           比如当前的 op 为 CHANNEL_RECEIVE,那么这里的 altqueue 就是拿到一个
           SEND的操作队列。如果 SEND 队列里面有任务,证明当前的 RECEIVE 操作是可以执行的;
           反之如果当前 op 为 CHANNEL_SEND,如果 RECEIVE 队列中有任务,那么 CHANNEL_SEND
           也是可以执行的。
        */
       altqueue = chanarray(c, otherop(a->op));
       return altqueue && altqueue->count;
   } else if (c->buffer.expandsize) {
       // c->buffer.expandsize > 0,代表 buffer.size < 0 的情况。
       // 如果设置了 buffer.expandsize,意味着 SEND 可以永远成功 (await 不会走这里)
       // expandable buffer
       switch(a->op){
           default:
               return 0;
           case CHANNEL_SEND:
               // send always success.
               return 1;
           case CHANNEL_RECEIVE:
               return c->buffer.count > 0;
       }
   } else{
       // buffer.size > 0 的情况
       //这里的 c.buffer == c.asend
       switch(a->op){
           default:
               return 0;
           case CHANNEL_SEND:
               // SEND时,buffer 里任务的数量 < 缓冲区最大容量,可以执行 SEND
               return c->buffer.count < c->buffer.size;
           case CHANNEL_RECEIVE:
               // RECEIVE时,buffer 里有任务就可以执行
               return c->buffer.count > 0;
       }
   }
}

这里忽略 c->buffer.expandsize 中的逻辑,重点来看 c->buffer.size == 0else 两个分支。关于 buffer.sizebuffer.count 不太理解的可以看上面 co_channel 创建过程的分析,理解了它们俩的概念,再来看这段逻辑应该不难:

  • buffer.size == 0(无缓冲区),RECEIVE 会直接取 c->asend, SEND 会直接取 c->arecv。如果队列里面有任务,那么可以成功。
  • buffer.size > 0(有缓冲区),如果缓冲区内未达最大容量,SEND 可以成功;如果缓冲区内有任务,RECEIVE 可以成功。

如下图:

image.png

buffer.size > 0 这个分支里也可以找到为什么要把 c->buffer 设置为 c->asend 的答案:对于存在缓冲区的情况,SENDRECEIVE都只需要判断 SEND 任务队里中的任务数量,而不需要关心 RECEIVE 任务队列中的任务数量
看完了上面的分析,大家对于中断的流程应该比较清楚了:await 内部调用 receive 的时候,c->asend 里面是不存在任务的,所以 altcanexec 返回 false,当前协程会被 coroutine_yield 中断

恢复协程

上面说到 receive 会中断当前的协程,那么当异步任务完成之后,会调用 [COChan send_nonblock:val] 把获取的到数据 val 传给 COChan,在这个过程中就触发了协程恢复。当调用 send 的时候,a->arecv 内部有任务,altcanexec 返回 true ,会立即执行 altexec 函数:

static int altexec(chan_alt *a) {

    alt_queue *altqueue;
    chan_alt *other = NULL;
    co_channel *c;
    
    c = a->channel;
    // 拿到 a->op 取反操作队列
    altqueue = chanarray(c, otherop(a->op));
    // 取出双向链表尾部的任务
    if(altqueuepop(altqueue, &other)){

        int copyRet = altcopy(a, other);
        assert(copyRet == 1);
        // 拿到 other 上的协程(如果是 SEND 这里就是 RECEIVE 的协程)
        coroutine_t *co = other->task;
        // co_chan_custom_resume
        void (*custom_resume)(coroutine_t *co) = c->custom_resume;
        chanunlock(c);
        
        // call back sender
        chan_alt *sender = a->op == CHANNEL_SEND ? a : other;
        // 如果是 SEND 直接执行 a->custom_exec, 如果是 RECEIVE 执行 other->custom_exec
        if (sender->custom_exec) {
            // [self.buffList addObject:val ?: kCOChanNilObj];
            sender->custom_exec();
        }
        // 把协程加到当前调度器中,如果该调度器上没有协程在运行,会立刻 resume 这个协程
        if (custom_resume) {
            custom_resume(co);
        } else {
            coroutine_add(co);
        }
        return CHANNEL_ALT_SUCCESS;
    } else {
        // altqueue 里没有任务
        int copyRet = altcopy(a, nil);
        chanunlock(c);
        
        if (copyRet && a->op == CHANNEL_SEND) {
            if (a->custom_exec) {
                a->custom_exec();
            }
        }
        return copyRet ? CHANNEL_ALT_SUCCESS : CHANNEL_ALT_ERROR_COPYFAIL;
    }
}

这个函数代码比较多,总结起来就是:

  1. 根据 c->op 取出反操作队列尾部的任务。
  2. 拿到该任务保存的协程对象。
  3. 如果是 SEND 操作,执行绑定在 chan_alt 上的 custom_exec,这个函数主要是这句代码 [self.buffList addObject:val ?: kCOChanNilObj],就是把 send 后面的参数添加到 COChanbuffList 属性里。
  4. resume 第二步保存的协程对象。

到这里我们就可以知道,当满足 altcanexec 的条件之后:

  1. 如果调用 send_nonblock 函数,那么会取出 RECEIVE 队列中的任务,把 send 过来的 val 放到 buffList 中,然后通过 custom_resume 恢复 RECEIVE 任务中的协程,恢复之后会从 buffList 里面取出刚才 send 传过来的 val,然后 return 出去。
  2. 如果调用 receive_nonblock 函数,会取出 SEND 队列中的任务,把 send 过来的 val 放到 buffList 中,恢复 RECEIVE 任务中的协程。执行完 SEND 协程的代码后继续执行 return CHANNEL_ALT_SUCCESS,返回到上层后 receive_nonblock 会返回 send 存在 buffList 中的值。

到此,整个 await 的流程已经比较清晰了,如下图:


一次 await 时序.png

最后

笔者的这篇文章主要从一个简单的协程例子开始,按着代码执行步骤一步一步带大家分析整个协程执行的流程,大家可以边看文章边跟着源码过一遍加深记忆。整个协程实现异步的同步化表达的过程核心在COChan,也就是一个阻塞的消息队列。当然还有其它的一些类(比如COActor)没有在这里展开讲,其实原理都差不多,它们的核心都是基于协程的几个族函数。
在下一篇文章我会继续带大家分析这几个族函数在 ARM64 下的实现。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,123评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,031评论 2 384
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,723评论 0 345
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,357评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,412评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,760评论 1 289
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,904评论 3 405
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,672评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,118评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,456评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,599评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,264评论 4 328
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,857评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,731评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,956评论 1 264
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,286评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,465评论 2 348