Python异步编程(上):协程和任务

原文链接: https://mp.weixin.qq.com/s/dQOocc7wHaGv7_cf476Ivg

介绍

了解异步编程前先了解一些概念:协程(coroutine)、任务(task)和事件循环(event loop),在3.7以前还需要关心Future这个东西,不过之后提供的高级API弱化了这个概念,你基本不需要关心Future是什么。

协程

协程等于一个人在工作之间切换,而线程则是等于两个人在工作(先不提GIL),例如烧水、煮饭还有切菜,只有一个人的话,你会把锅装满水放到炉子上烧,然后再去煮饭。而有的人他就是要等到水烧开了再去煮饭,这样要快速完成只能再叫一个人。

IO

这个烧水的过程被称为IO操作,在Python中像网络请求、文件读写等都是IO操作,特别是网络请求,等待服务器返回数据这个过程耗时是非常长的,但它实际上不需要cpu的参与,只是单纯的等待。其实print也是IO操作,但是它耗时非常短基本不会影响,也就不需要在弄一个aioprint出来。

像切菜这种完全离不开人的工作则被称为cpu密集型程序,在Python中跟计算相关的程序都离不开cpu,这种使用异步操作就无法节省时间,必须得加一个人来做。

协程

async def func():
    pass

func和func()都是协程,异步函数(协程函数)被调用只是返回协程对象,不会立即执行。

任务

任务只是对协程的封装,用于处理协程的取消和获取结果等,创建任务一般都是通过asyncio.create_task来创建,例如

task = asyncio.create_task(func)

注意这个时候func已经在运行了,你可以使用result = await task或者asyncio.wait_for等待它运行完成,也可以通过task.cancel()来取消它

事件循环

事件循环这个概念没什么可说的,其实就是等于一个人在工作,每个线程只能运行一个事件循环。可以在多线程中创建多个事件循环,然后通过asyncio.run_coroutine_threadsafe指定协程运行在哪个事件循环里

使用

运行事件循环

创建并运行事件循环一般是通过asyncio.run来实现的,不过有时候这个函数会有一些奇怪的错误。这个时候就需要使用低级API创建并运行

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

在这里get_event_loop相当于下面两行

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

简单例子

下面的代码等同于同步编程

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

一起运行

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return delay

async def main():
    # 创建并执行任务
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # 等待任务完成
    print(await task1)
    print(await task2)

    print(f"finished at {time.strftime('%X')}")
    
asyncio.run(main())

也可以通过asyncio.gather来实现,函数原型: asyncio.gather(*aws, loop=None, return_exceptions=False)

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return delay

async def main():
    print(f"started at {time.strftime('%X')}")

    print(await asyncio.gather(say_after(1, 'hello'), say_after(2, 'world')))

    print(f"finished at {time.strftime('%X')}")
    
asyncio.run(main())

gather也可以用来等待任务

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return delay

async def main():
    # 创建并执行任务
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # 等待任务完成
    print(await asyncio.gather(task1, task2))

    print(f"finished at {time.strftime('%X')}")
    
asyncio.run(main())

asyncio.wait_for

函数原型: asyncio.wait_for(aw, timeout, *, loop=None)

wait_for和直接await只有一个区别,可以指定超时时间

import asyncio
async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

asyncio.wait

函数原型: asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

wait也是一起执行多个协程,与gather的区别可以从参数和返回值看出来,具体区别如下:

  • wait一般是用来等待create_task创建的任务,而gather可以直接运行协程
  • gather直接返回所有协程的结果(顺序也一样),而wait返回的是所有协程的运行状态
  • wait给定timeout并超时后,不会引发asyncio.TimeoutError异常,而是返回所有协程的运行状态
  • wait可以指定return_when,当某个协程执行完成或触发异常时立即返回运行状态,而gather需要等待所有协程完成

return_when有三个值:

  • FIRST_COMPLETED: 其中某个协程执行完成或被取消时返回
  • FIRST_EXCEPTION: 其中某个协程触发异常
  • ALL_COMPLETED: 所有协程都完成或者被取消
import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return delay

async def main():
    # 创建并执行任务
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # 等待任务完成
    done, pending = await asyncio.wait([task1, task2])
    for task in done:
        print(task.result())
    print(f"finished at {time.strftime('%X')}")
    
asyncio.run(main())

asyncio.as_completed

等待所有协程完成,以完成顺序做迭代器

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return delay

async def main():
    # 创建并执行任务
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # 等待任务完成
    for core in asyncio.as_completed([task1, task2]):
        print(await core)
    print(f"finished at {time.strftime('%X')}")
    
asyncio.run(main())

asyncio.all_tasks

这个没啥说的,就是返回当前所有任务列表

import asyncio

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return delay

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'), name="task1")
    print(asyncio.all_tasks())
    task2 = asyncio.create_task(
        say_after(2, 'world'), name="task2")

    print(asyncio.all_tasks())
    await task1
    print(asyncio.all_tasks())
    await task2
    print(asyncio.all_tasks())
    
asyncio.run(main())

asyncio.Task

  • cancel(msg=None): 取消
  • cancelled(): 如果任务被取消,则返回True
  • done(): 如果任务完成则返回True
  • result(): 返回任务的结果
  • exception(): 返回任务的异常
  • add_done_callback(callback, *, context=None): 添加任务完成时的回调
  • remove_done_callback(callback): 从回调列表中删除回调
  • get_stack(*, limit=None): 返回此任务的堆栈帧列表
  • print_stack(*, limit=None, file=None): 打印此任务的堆栈或回溯
  • get_coro(): 返回由Task包装的协程对象
  • get_name(): 返回任务的名称
  • set_name(value): 设置任务的名称

asyncio.iscoroutine(obj)

如果obj是协程对象,则返回True

import asyncio

async def func():
    pass

async def main():
    print("iscoroutine1: ", asyncio.iscoroutine(func))
    a = func()
    print("iscoroutine2: ", asyncio.iscoroutine(a))
    b = await a
    print("iscoroutine3: ", asyncio.iscoroutine(b))


asyncio.run(main())

asyncio.iscoroutinefunction(func)

如果func是协程函数,则返回True

import asyncio

async def func():
    pass

async def main():
    print("iscoroutine1: ", asyncio.iscoroutinefunction(func))
    a = func()
    print("iscoroutine2: ", asyncio.iscoroutinefunction(a))
    b = await a
    print("iscoroutine3: ", asyncio.iscoroutinefunction(b))


asyncio.run(main())
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容