Python 通过 concurrent.futures 模块以异步方式处理并发需求

对于计算机程序的执行流而言,I/O 操作通常是时间占比非常大的一块。在当前的硬件设备中,绝大多数 I/O 操作要比 CPU 慢上几个数量级。比如大约花费 1 毫秒写入一个网络 socket,对应到 2.4GHz 的处理器上,同样的时间则可以执行 24000000 条指令。

在一般的同步执行的程序中,当代码遇到 I/O 操作时(如读取一个文件或者写入一个网络 socket),必须暂时中止和内核的交互,去请求 I/O 并等待传输完成。这种因 I/O 阻塞而产生的等待在某些情况下往往导致执行效率的低下和响应的延迟。

而在异步执行的流程中,当一个程序进入 I/O 等待时,其控制权会被移交给程序的其他部分,直到 I/O 操作完成时才可以重新获取(这称为上下文切换)。
异步程序中一般会有一个事件循环用来监听事件并分派任务。比如用一个异步程序做一次网络写操作,该请求会立即返回(程序控制权移交给事件循环),即便写操作实际上并未发生。此时程序允许执行另外的函数和运算。
当写操作完成时,会触发一个特定的事件,由事件循环响应该事件并执行关联的操作。

同步程序下载网络资源

以下代码为使用同步的方式下载网络中存放的多张国旗图片:

# flags.py
import os
import time
import sys

import requests

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'

def save_flag(img, filename):
    if not os.path.exists(DEST_DIR):
        os.makedirs(DEST_DIR)
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)

def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content

def show(text):
    print(text, end=' ')
    sys.stdout.flush()

def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')
    
    return len(cc_list)

def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main(download_many)

# => BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
# => 20 flags downloaded in 218.70s

使用 concurrent.futures 模块下载

concurrent.futures 模块的主要特色是包含 ThreadPoolExecutorProcessPoolExecutor 两个类,它们实现的接口可以分别在不同的线程或进程中执行可调用的对象,并且它们内部都维护着一个工作线程(或进程)池和一个任务队列。

下载代码如下(引用了上一个源文件 flags.py 中的几个功能函数):

from concurrent import futures

from flags import save_flag, get_flag, show, main

MAX_WORKERS = 20

def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_one, sorted(cc_list))
    
    return len(list(res))


if __name__ == "__main__":
    main(download_many)

# => EG BD NG CD IN ET RU ID CN FR US PK PH MX IR VN BR JP DE TR
# => 20 flags downloaded in 83.36s

其中最关键的部分为 download_many 函数。
workers 变量用于指定 ThreadPoolExecutor 对象使用的工作线程的数量,取预设的最大线程数(MAX_WORKERS)和实际下载数目(len(cc_list))中的较小的值;
with 语句用于使用指定数量(workers)的工作线程初始化 ThreadPoolExecutor 对象;
map 方法类似于内置的 map 函数,目的是使 download_one 函数可以被多个工作线程并行地调用。它会返回一个生成器对象,该生成器可以被遍历以获取每一个 download_one 执行后的结果。

Future 对象

Python 标准库中包含两个名为 Future 的类:concurrent.futures.Futureasyncio.Future 。这两个类的实例都表示可能已经完成或者尚未完成的延迟计算
Future 对象并不是一个立即产生的实际结果,更像是一种“承诺”,需要等待其执行完毕并被我们期待的值所填充。
在等待“承诺”兑现的过程中程序可以同时执行其他运算。

Future 是 concurrent.futures 模块和 asyncio 库的重要组件,但是在上面的代码中并没有直接调用 Future 对象。
通常情况下,Future 不应该由用户显式地创建,而只能由并发框架实例化。Future 如同它的名字一样,代表将要发生的事情,而确定某件事未来会发生的唯一方式是其执行时间已经排定。Executor.submit() 方法会接收一个可调用对象作为参数并为其排期,返回一个 Future 对象。

用户代码也不应该改变 Future 对象的状态。并发框架会在 Future 代表的延迟计算结束后自动改变 Future 的状态,没有办法人为地控制延迟计算何时结束。
Future 具有非阻塞的 .done() 方法,返回布尔值表明 Future 对应的调用对象是否已经执行完毕。此外还有 .add_done_callback() 方法用于在 Future 运行结束后执行特定的回调函数。
concurrency.futures.Future 实例还有 .results() 方法用以获取 Future 执行的可调用对象的结果。该方法会阻塞调用方所在的线程,直到可调用对象运行结束并返回结果。result() 方法可以接收可选的 timeout 参数用于设定超时时间。

从更现实的角度理解 Future 对象,可以参考如下代码:

from concurrent import futures
from flags import save_flag, get_flag, show, main
import time

MAX_WORKERS = 20

def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))

        results = []
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)
    
    return len(results)


if __name__ == '__main__':
    main(download_many)

此处的代码只通过 3 个工作线程获取 5 个国家的国旗图片。和之前的代码相比,将 download_many 函数中较抽象的 executor.map 替换成了两个 for 循环:

  • executor.submit 用于排定可调用对象(即 download_one(cc))给多个工作线程执行,返回一个 Future 对象表示这个待执行的操作。
  • futures.as_completed 则用于在 Future 运行结束后获取可执行对象(即 download_one(cc))返回的结果。

本例中的 future.result() 方法绝不会阻塞,因为 future 是由 as_completed 函数返回的。

最终输出如下:

Scheduled for BR: <Future at 0x4524a48 state=running>
Scheduled for CN: <Future at 0x3ca46c8 state=running>
Scheduled for ID: <Future at 0x453f7c8 state=running>
Scheduled for IN: <Future at 0x2ab1308 state=pending>
Scheduled for US: <Future at 0x4530888 state=pending>
ID BR CN  <Future at 0x453f7c8 state=finished returned str> result: 'ID'
<Future at 0x3ca46c8 state=finished returned str> result: 'CN'
<Future at 0x4524a48 state=finished returned str> result: 'BR'
IN <Future at 0x2ab1308 state=finished returned str> result: 'IN'
US <Future at 0x4530888 state=finished returned str> result: 'US'

5 flags downloaded in 1.57s

从输出中可以看出,前三个 Future 的状态是 running,后两个 Future 的状态是 pending,因为只有三个工作线程可供分配。同时,如果多运行几次,输出结果的顺序也是有变化的。

关于 GIL

Cython 解释器不是线程安全的,它通过 GIL(Global Interpreter Lock,全局解释器锁)强制性地一次只允许一个线程执行 Python 代码。所以通常一个 Python 进程并不能同时使用多个 CPU 核心,即不能够将一个 Python 进程拆分成多个独立执行的线程在多个 CPU 核心上以并行的方式运行。
但是 Python 标准库中所有执行阻塞型 I/O 操作的函数,在等待系统返回结果时都会释放 GIL。即在 I/O 密集型的需求场景下,Python 程序可以通过多线程来提升性能。

参考资料

Fluent Python

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,692评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,482评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,995评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,223评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,245评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,208评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,091评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,929评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,346评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,570评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,739评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,437评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,037评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,677评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,833评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,760评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,647评论 2 354

推荐阅读更多精彩内容