Python 3.7 通过 asyncio 实现异步编程

Python 中通过 asyncio 实现的异步编程主要包含如下三个模块:

  • 事件循环(event loop):每一个需要异步执行的任务都会在事件循环中注册,事件循环负责管理这些任务之间的执行流程
  • 协程(Coroutine):指用于执行具体某个异步任务的函数。函数体中的 await 关键字可以将协程的控制权释放给事件循环
  • Future:表示已经执行或者尚未执行的任务的结果

在异步程序的世界里,所有代码都运行在事件循环中,可以同时执行多个协程。这些协程异步地执行,直到遇到 await 关键字,此时该协程会让出程序控制权给事件循环,使得其他协程有机会发挥作用。
需要注意的是,不能在同一个函数中同时包含异步和同步代码。即在同步函数中无法使用 await 关键字。

一、Hello World

以下是一段简单的使用了 async 关键字的 Hello World 程序:

import asyncio

async def hello(first_print, second_print):
    print(first_print)
    await asyncio.sleep(1)
    print(second_print)

asyncio.run(hello("Welcome", "Good-bye"))
# => Welcome
# => Good-bye

上述代码的行为看上去更像是同步代码,先输出 Welcome,等待一秒钟之后,再输出 Good-bye
在进一步探究之前,先看下上述异步代码中出现的几个基本概念:

  • Python 语言中,任何由 async def 定义的函数(即上面的 hello())都可以称之为协程。调用协程函数所返回的对象称为协程对象。
  • 函数 asyncio.run 是所有异步代码的主入口,只应该被调用一次。它负责组织传入的协程对象,同时管理 asyncio 的事件循环。
  • await 关键字用于将协程运行时获取的程序控制权移交给事件循环,并中断该协程的执行流程。

一个更现实的异步程序的示例如下:

import asyncio
import time

async def say_something(delay, words):
    print(f"Started: {words}")
    await asyncio.sleep(delay)
    print(f"Finished: {words}")

async def main():
    print(f"Starting Tasks: {time.strftime('%X')}")
    task1 = asyncio.create_task(say_something(1, "First task"))
    task2 = asyncio.create_task(say_something(2, "Second task"))

    await task1
    await task2

    print(f"Finished Tasks: {time.strftime('%X')}")

asyncio.run(main())

# => Starting Tasks: 20:32:28
# => Started: First task
# => Started: Second task
# => Finished: First task
# => Finished: Second task
# => Finished Tasks: 20:32:30

从同步执行的逻辑来看,应该是 task1 开始,等待一秒钟,结束;task2 开始,等待两秒钟,结束。共耗时 3 秒以上。
异步程序实际的执行流程为,task1task2 同时开始,各自等待一段时间后,先后结束。共耗时 2 秒。具体如下:

  • task1 中的 say_something 协程开始执行
  • say_something 遇到 await 关键字时(await asyncio.sleep(delay)),协程暂停执行并等待 1 秒钟,在暂停的同时将程序控制权转移给事件循环
  • task2 从事件循环获取控制权开始执行,同样遇到 await 关键字时暂停协程并等待 2 秒钟,在暂停的同时将程序控制权转移给事件循环
  • task1 等待时间结束后,事件循环将控制权移交给 task1,恢复其协程的运行直至结束
  • task1 运行结束,task2 等待时间完成,task2 获取程序控制权并恢复运行直至结束。两个任务执行完成。

二、Awaitable 对象

await 关键字用于将程序控制权移交给事件循环并中断当前协程的执行。它有以下几个使用规则:

  • 只能用在由 async def 修饰的函数中,在普通函数中使用会抛出异常
  • 调用一个协程函数后,就必须等待其执行完成并返回结果
  • await func() 中的 func() 必须是一个 awaitable 对象。即一个协程函数或者一个在内部实现了 __await__() 方法的对象,该方法会返回一个生成器

Awaitable 对象包含协程、Task 和 Future 等。

协程

关于被 await 调用的协程,即上面的第二条规则,可以参考如下代码:

import asyncio

async def mult(first, second):
    print(f"Calculating multiply of {first} and {second}")
    await asyncio.sleep(1)
    num_mul = first * second
    print(f"Multiply is {num_mul}")
    return num_mul

async def sum(first, second):
    print(f"Calculating sum of {first} and {second}")
    await asyncio.sleep(1)
    num_sum = first + second
    print(f"Sum is {num_sum}")
    return num_sum

async def main(first, second):
    await sum(first, second)
    await mult(first, second)

asyncio.run(main(7, 8))
# => Calculating sum of 7 and 8
# => Sum is 15
# => Calculating multiply of 7 and 8
# => Multiply is 56

上述代码中由 await 修饰的两个协程函数 summult 即为 awaitable 对象,从输出结果中可以看出,sum 函数先执行完毕并输出结果,随后 mult 函数执行并输出结果。
await 调用的协程函数必须执行完毕后才能继续执行另外的 await 协程,这看上去并不符合异步程序的定义。

Tasks

协程异步执行的关键在于 Tasks。
当任意一个协程函数被类似于 asyncio.create_task() 的函数调用时,该协程就会自动排进由事件循环管理的执行流程里。在 asyncio 的定义中,由事件循环控制运行的协程即被称为任务
绝大多数情况下,编写异步代码即意味着需要使用 create_task() 方法将协程放进事件循环。

参考如下代码:

import asyncio

async def mul(first, second):
    print(f"Calculating multiply of {first} and {second}")
    await asyncio.sleep(1)
    num_mul = first * second
    print(f"Multiply is {num_mul}")
    return num_mul

async def sum(first, second):
    print(f"Calculating sum of {first} and {second}")
    await asyncio.sleep(1)
    num_sum = first + second
    print(f"Sum is {num_sum}")
    return num_sum

async def main(first, second):
    sum_task = asyncio.create_task(sum(first, second))
    mul_task = asyncio.create_task(mul(first, second))
    await sum_task
    await mul_task

asyncio.run(main(7, 8))

# => Calculating sum of 7 and 8
# => Calculating multiply of 7 and 8
# => Sum is 15
# => Multiply is 56

对比上一段代码示例,从输出中可以看出,sum_taskmul_task 两个任务的执行流程符合异步程序的逻辑。
sum_task 遇到 await asyncio.sleep(1) 语句后并没有让整个程序等待自己返回计算结果,而是中断执行并把控制权通过事件循环移交给 mul_task。两个任务先后执行并进入等待,最后在各自的等待时间结束后输出结果。

create_task() 函数以外,还可以使用 asyncio.gather() 函数创建异步任务:

import asyncio
import time

async def greetings():
    print("Welcome")
    await asyncio.sleep(1)
    print("Good by")

async def main():
    await asyncio.gather(greetings(), greetings())

def say_greet():
    start = time.perf_counter()
    asyncio.run(main())
    elasped = time.perf_counter() - start
    print(f"Total time elasped: {elasped}")

say_greet()

# => Welcome
# => Welcome
# => Good by
# => Good by
# => Total time elasped: 1.0213364

实际两个任务完成的时间略大于 1 秒而不是 2 秒。

Futures

Futures 代表异步操作的预期结果,即该异步操作可能已经执行也可能尚未执行完毕。通常情况下并不需要在代码中显式地管理 Future 对象,这些工作一般由 asyncio 库隐式地处理。

当一个 Future 实例被创建成功以后,即代表该实例关联的异步操作还没有完成,但是会在未来的某个时间返回结果。
asyncio 有一个 asyncio.wait_for(aws, timeout, *) 方法可以为异步任务设置超时时间。如果超过指定时间后异步操作仍未执行完毕,则该任务被取消并抛出 asyncio.TimeoutError 异常。
timeout 的默认值为 None,即程序会阻塞并一直等待直到 Future 对象关联的操作返回结果。

import asyncio

async def long_time_taking_method():
    await asyncio.sleep(4000)
    print("Completed the work")

async def main():
    try:
        await asyncio.wait_for(long_time_taking_method(),
        timeout=2)
    except asyncio.TimeoutError:
        print("Timeout occurred")

asyncio.run(main())
# => Timeout occurred

三、Async 实例代码

通过创建子进程异步执行 Shell 命令:

import asyncio


async def run(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')


async def main():
    await asyncio.gather(
        run('sleep 2; echo "world"'),
        run('sleep 1; echo "hello"'),
        run('ls /zzz'))

asyncio.run(main())

# => ['ls /zzz' exited with 2]
# => [stderr]
# => ls: cannot access '/zzz': No such file or directory

# => ['sleep 1; echo "hello"' exited with 0]
# => [stdout]
# => hello

# => ['sleep 2; echo "world"' exited with 0]
# => [stdout]
# => world

通过 Queue 将工作负载分发给多个异步执行的 Task 处理:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())
# => worker-2 has slept for 0.12 seconds
# => worker-1 has slept for 0.28 seconds
# => worker-1 has slept for 0.12 seconds
# => worker-0 has slept for 0.46 seconds
# => worker-0 has slept for 0.49 seconds
# => worker-2 has slept for 0.90 seconds
# => worker-1 has slept for 0.62 seconds
# => worker-1 has slept for 0.67 seconds
# => worker-0 has slept for 0.85 seconds
# => worker-2 has slept for 0.94 seconds
# => worker-1 has slept for 0.45 seconds
# => worker-2 has slept for 0.19 seconds
# => worker-0 has slept for 0.99 seconds
# => worker-2 has slept for 0.86 seconds
# => worker-1 has slept for 0.97 seconds
# => worker-0 has slept for 0.74 seconds
# => worker-1 has slept for 0.58 seconds
# => worker-2 has slept for 0.73 seconds
# => worker-1 has slept for 0.27 seconds
# => worker-0 has slept for 0.57 seconds
# => ====
# => 3 workers slept in parallel for 4.10 seconds
# => total expected sleep time: 11.80 seconds

参考资料

asyncio — Asynchronous I/O

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容