python之gevent(2)

在之前,我已经在两篇文章中分别介绍了gevent的使用以及gevent的底层greenlet的使用,可以阅读文章回顾一下:python之gevent(1)python之greenlet。本文将结合gevent的源码对其调度过程进行解析。

需要了解的背景知识

在了解gevent的调度过程前,最好是对greenlet有一定了解,可查看文章python之greenlet,而最主要的知识点我认为在于:
1、每一个greenlet.greenlet实例都有一个parent(可指定,默认为创生新的greenlet.greenlet所在环境),当greenlet.greenlet实例执行完逻辑正常结束、或者抛出异常结束时,执行逻辑切回到其parent
2、可以继承greenlet.greenlet,子类需要实现run方法,当调用greenlet.switch方法时会调用到这个run方法

在gevent中,有两个类继承了greenlet.greenlet,分别是gevent.hub.Hub和gevent.greenlet.Greenlet。后文中,如果是greenlet.greenlet这种写法,那么指的是原生的类库greentlet,如果是greenlet(或者Greenlet)那么指gevent封装后的greenlet。

gevent调度流程

每个gevent线程都有一个hub,上面提到hub是greenlet.greenlet的实例。hub实例在需要的时候创生,那么其parent是main greenlet。之后任何的Greenlet(greenlet.greenlet的子类)实例的parent都设置成hub。hub调用libev提供的事件循环来处理Greenlet代表的任务,当Greenlet实例结束(正常或者异常)之后,执行逻辑又切换到hub。

gevent调度实例1

最基础的代码是这样的:


image.png

虽然这两行代码很简单,但是gevent的核心都包含在其中,下面来看具体的源码:
首先是在上图中调用的sleep函数(gevent.hub.sleep):

def sleep(seconds=0, ref=True):
    hub = get_hub()
    loop = hub.loop
    if seconds <= 0:
        waiter = Waiter()
        loop.run_callback(waiter.switch)
        waiter.get()
    else:
        hub.wait(loop.timer(seconds, ref=ref))

进入函数之后,首先是获取hub(get_hub()),然后在hub上wait这个定时器事件(上面代码最后一行)。get_hub源码如下(gevent.hub.get_hub):

def get_hub(*args, **kwargs):
    """
    Return the hub for the current thread.

    """
    hub = _threadlocal.hub
    if hub is None:
        hubtype = get_hub_class()
        hub = _threadlocal.hub = hubtype(*args, **kwargs)
    return hub

可以看到,hub是线程内唯一的,之前也提到过greenlet是线程独立的,每个线程有各自的greenlet栈。hubtype默认就是gevent.hub.Hub,在hub的初始化函数(init)中,会创建loop属性,默认也就是libev的python封装。

回到sleep函数定义,hub.wait(loop.timer(seconds, ref=ref))。hub.wait函数非常关键,对于任何阻塞性操作,比如timer、io都会调用这个函数,其作用一句话概括:从当前协程切换到hub,直到watcher对应的事件就绪再从hub切换回来。wait函数源码如下(gevent.hub.Hub.wait):

def wait(self, watcher):
        """
        Wait until the *watcher* (which should not be started) is ready.

        """
        waiter = Waiter()
        unique = object()
        watcher.start(waiter.switch, unique)
        try:
            result = waiter.get()
            if result is not unique:
                raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique))
        finally:
            watcher.stop()

形参watcher就是loop.timer实例,其cython描述在corecext.pyx,我们简单理解成是一个定时器事件就行了。上面的代码中,创建了一个Waiter(gevent.hub.Waiter)对象,这个对象起什么作用在这个类的doc中写得非常清楚:

Waiter.__doc__  
A low level communication utility for greenlets.
Waiter is a wrapper around greenlet's switch() and throw() calls that makes them somewhat safer:
* switching will occur only if the waiting greenlet is executing :meth:get method currently;
* any error raised in the greenlet is handled inside :meth:switch and :meth:throw
* if :meth:switch/:meth:throw is called before the receiver calls :meth:get, then :class:Waiter
will store the value/exception. The following :meth:get will return the value/raise the exception

总的来说,是对greenlet.greenlet类switch 和 throw函数的分装,用来存储返回值greenlet的返回值或者捕获在greenlet中抛出的异常。我们知道,在原生的greenlet中,如果一个greenlet抛出了异常,那么该异常将会展开至其parent greenlet。

回到Hub.wait函数,watcher.start(waiter.switch, unique) 注册了一个回调,在一定时间(1s)之后调用回调函数waiter.switch。注意,waiter.switch此时并没有执行。然后后面调用waiter.get。看看这个get函数(gevent.hub.Waiter.get):

def get(self):
        """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
        if self._exception is not _NONE:
            if self._exception is None:
                return self.value
            else:
                getcurrent().throw(*self._exception)
        else:
            if self.greenlet is not None:
                raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
            self.greenlet = getcurrent() # 存储当前协程,之后从hub switch回来的时候使用
            try:
                return self.hub.switch() # switch到hub
            finally:
                self.greenlet = None

核心的逻辑在这段代码最后几行,也就是第11到15行,11行中,getcurrent获取当前的greenlet(在这个测试代码中,是main greenlet,即最原始的greenlet),将其复制给waiter.greenlet。然后13行switch到hub,在greenlet回顾章节的第二条提到,greenlet.greenlet的子类需要重写run方法,当调用子类的switch时会调用到该run方法。Hub的run方法实现如下:

def run(self):
       """
       Entry-point to running the loop. This method is called automatically
       when the hub greenlet is scheduled; do not call it directly.

       :raises LoopExit: If the loop finishes running. This means
          that there are no other scheduled greenlets, and no active
          watchers or servers. In some situations, this indicates a
          programming error.
       """
       assert self is getcurrent(), 'Do not call Hub.run() directly'
       while True:
           loop = self.loop
           loop.error_handler = self
           try:
               loop.run()
           finally:
               loop.error_handler = None  # break the refcount cycle
           self.parent.throw(LoopExit('This operation would block forever', self))

loop自然是libev的事件循环。doc中提到,这个loop理论上会一直循环,如果结束,那么表明没有任何监听的事件(包括IO 定时等)。之前在Hub.wait函数中注册了定时器,那么在这个run中,如果时间到了,那么会调用定时器的callback,也就是之前的waiter.switch, 我们再来看看这个函数(gevent.hub.Waiter.switch):

def switch(self, value=None):
        """Switch to the greenlet if one's available. Otherwise store the value."""
        greenlet = self.greenlet
        if greenlet is None:
            self.value = value
            self._exception = None
        else:
            assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
            switch = greenlet.switch
            try:
                switch(value)
            except:
                self.hub.handle_error(switch, *sys.exc_info())

这段代码的主要内容在第8到13行,第8行保证调用到该函数的时候一定在hub这个协程中,这是很自然的,因为这个函数一定是在Hub.run中被调用。第11行switch到waiter.greenlet这个协程,在讲解waiter.get的时候就提到了waiter.greenlet是main greenlet。注意,这里得switch会回到main greenlet被切出的地方(也就是main greenlet挂起的地方),那就是在waiter.get的第10行,整个逻辑也就恢复到main greenlet继续执行。

总结:sleep的作用很简单,触发一个阻塞的操作,导致调用hub.wait,从当前greenlet.greenlet切换至Hub,超时之后再从hub切换到之前的greenlet继续执行。通过这个例子可以知道,gevent将任何阻塞性的操作封装成一个Watcher,然后从调用阻塞操作的协程切换到Hub,等到阻塞操作完成之后,再从Hub切换到之前的协程

gevent调度实例2

上面的例子中,虽然能够理顺gevent的调度流程,但事实上并没有体现出gevent 协作的优势。接下来再看一个例子:

import gevent

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

上述代码运行后输出如下:


image.png

从输出可以看到, foo和bar依次输出,显然是在gevent.sleep的时候发生了执行流程切换,gevent.sleep再前面已经介绍了,那么这里主要关注spawn和joinall函数。

gevent.spawn本质调用了gevent.greenlet.Greenlet的类方法spawn:

@classmethod
def spawn(cls, *args, **kwargs):
    g = cls(*args, **kwargs)
    g.start()
    return g

这个类方法调用了Greenlet的两个函数,init 和 start. init函数中最为关键的是这段代码:

    def __init__(self, run=None, *args, **kwargs):
        greenlet.__init__(self, None, get_hub()) # 将新创生的greenlet实例的parent一律设置成hub
        if run is not None:
            self._run = run

start函数的定义(gevent.greenlet.Greenlet.start):

def start(self):
    """Schedule the greenlet to run in this loop iteration"""
    if self._start_event is None:
        self._start_event = self.parent.loop.run_callback(self.switch)

注册回调事件self.switch到hub.loop,注意Greenlet.switch最终会调用到Greenlet._run, 也就是spawn函数传入的callable对象(foo、bar)。这里仅仅是注册,但还没有开始事件轮询,gevent.joinall就是用来启动事件轮询并等待运行结果的。

joinall函数会一路调用到gevent.hub.iwait函数:

def iwait(objects, timeout=None, count=None):
    """
    Iteratively yield *objects* as they are ready, until all (or *count*) are ready
    or *timeout* expired.
    """
    # QQQ would be nice to support iterable here that can be generated slowly (why?)
    if objects is None:
        yield get_hub().join(timeout=timeout)
        return

    count = len(objects) if count is None else min(count, len(objects))
    waiter = _MultipleWaiter() # _MultipleWaiter是Waiter的子类
    switch = waiter.switch

    if timeout is not None:
        timer = get_hub().loop.timer(timeout, priority=-1)
        timer.start(switch, _NONE)

    try:
        for obj in objects:
            obj.rawlink(switch) # 这里往hub.loop注册了回调
 
        for idx in xrange(count):    #第23行
            print 'for in iwait', idx
            item = waiter.get() # 这里会切换到hub
            print 'come here ', item, getcurrent()
            waiter.clear()
            if item is _NONE:
                return
            yield item
    finally:
        if timeout is not None:
            timer.stop()
        for obj in objects:
            unlink = getattr(obj, 'unlink', None)
            if unlink:
                try:
                    unlink(switch)
                except:
                    traceback.print_exc()

然后iwait函数第23行开始的循环,逐个调用waiter.get。这里的waiter是_MultipleWaiter(Waiter)的实例,其get函数最终调用到Waiter.get。前面已经详细介绍了Waiter.get,简而言之,就是switch到hub。我们利用greenlet的tracing功能可以看到整个greenlet.greenlet的switch流程,修改后的代码如下:

import gevent
import greenlet
def callback(event, args):
    print event, args[0], '===:>>>>', args[1]

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

print 'main greenlet info: ', greenlet.greenlet.getcurrent()
print 'hub info', gevent.get_hub()
oldtrace = greenlet.settrace(callback)
        
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])
greenlet.settrace(oldtrace)

总结:gevent.spawn创建一个新的Greenlet,并注册到hub的loop上,调用gevent.joinall或者Greenlet.join的时候开始切换到hub。

本文通过两个简单的例子并结合源码分析了gevent的协程调度流程。gevent的使用非常方便,尤其是在web server中,基本上应用App什么都不用做就能享受gevent带来的好处。本文写出来的主要目的在于想了解gevent对greenlet的封装和使用,greenlet很强大,强大到容易出错,而gevent保证在两层协程之间切换,值得学习并加以使用。

推荐阅读:
http://www.gevent.org/
https://pypi.org/project/greenlet/
gevent源码

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容