在Python中,并发并不是同一时刻有多个操作(thread/task)同时进行。相反,由于全局解释器锁(GIL) 的存在,在某个特定的时刻,它只允许有一个操作发生,只不过线程或任务之间会互相切换,直到完成,如下图所示:Python version 3.8.5
上图中出现了线程(thread) 和任务(task)两种切换顺序的不同方式,分别对应Python中的两种实现并发的方法:threading 和 asyncio. 对于 threading,操作系统知道每个线程的所有信息,因此它会做主在适当的时候做线程切换。而对于 asyncio,主程序想要切换任务时,必须得到此任务可以被切换的通知。
本文内容只涉及基于concurrent.futures的多线程并发,不涉及asyncio。
选择多线程还是多进程?
核心概念:
- 进程是操作系统分配资源的最小单元,线程是操作系统调度的最小单元;
- 一个应用程序至少包括一个进程,一个进程至少包括一个线程;
- 每个进程在执行过程中拥有独立的内存单元,而一个进程的多个线程在执行过程中共享内存;
如果手头的任务是I/O密集型,可以使用标准库的 threading 模块,或者任务是CPU密集型,则可以使用 multiprocessing 模块。这两个模块提供了很多控制权和灵活性,但代价就是必须编写相对低级的冗长代码,在任务核心逻辑的基础上增加额外的并具有复杂性的层,并且当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候往往需要编写自己的线程池/进程池,以空间换时间。
从Python 3.2开始,标准库提供了 concurrent.futures 模块,它在 threading 和 multiprocessing 之上的一个通用抽象层,提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,以便使用线程池/进程池并发/并行地执行任务。
多线程并发编程之concurrent.futures
concurrent.futures 模块对外提供了以下常量、函数或类:
__all__ = (
'FIRST_COMPLETED',
'FIRST_EXCEPTION',
'ALL_COMPLETED',
'CancelledError',
'TimeoutError',
'BrokenExecutor',
'Future',
'Executor',
'wait',
'as_completed',
'ProcessPoolExecutor',
'ThreadPoolExecutor',
)
Executor
Executor 是一个抽象类,不应该直接使用此类,而是使用它提供的两个子类:ThreadPoolExecutor 和 ProcessPoolExecutor,顾名思义两者分别被用来创建线程池和进程池的代码。
Executor的重要方法
ProcessPoolExecutor 和 ThreadPoolExecutor 类中最重要的几个方法如下:
submit(fn,*args, **kwargs):提交异步任务(一般是阻塞的可调用函数),并返回表示可调用对象执行的 Future 对象(原文:returns a
Future
object representing the execution of the callable);此方法不保存与原始任务相关的任何上下文,如果想把结果和原始任务对应起来,需要自己去追踪它们,比如使用字典推导式;map(fn, *iterables, timeout=None, chunksize=1):和标准的map函数功能类似,同样返回一个迭代器,只不过是以异步的方式把函数依次作用在可迭代对象的每个元素上,注意:如果函数调用引发异常,当从迭代器检索其值时将引发异常,并且不会继续执行;
shutdown(wait=True):通知执行器,当所有挂起的 Future 对象执行完成时,释放正在使用的任何资源;在抽象基类中实现了上下文管理器协议,
__exit__
方法中调用了shutdown方法;
future
Futute 类封装可调用对象的异步执行,应由Executor.submit() 方法创建,可以理解为一个在未来完成的操作。比如说在写爬虫代码时会用到 requests.get ,在等待服务器返回结果之前的这段时间会产生I/O阻塞,CPU不能让出来做其他的事情,Future的引入就是帮助我们在等待的这段时间可以完成其他的操作。
Futute 类中最重要的几个方法如下:
- result(timeout=None):返回可调用对象的实际返回值;
- cancel():尝试取消future,如果它正在运行或已经完成,则不能取消,返回False,若取消成功则返回True;
- cancelled():如果future已被取消,则返回True;
- running():如果future当前正在运行,则返回True;
- done():如果future已被取消或执行完成,则返回True;
- add_done_callback(fn):future执行完成后,添加回调;
- exception(timeout=None):返回future执行时所引发的异常;
wait 和 as_completed
模块下有2个重要函数 wait
和 as_completed
。
-
wait(fs, timeout=None, return_when=ALL_COMPLETED)
遍历fs提供的future(可能由不同的Executor实例创建),并等待执行完成(包含已取消),如果未设置timeout参数,则代表不限制等待时间,return_when参数则用于设置次函数应该在何时返回,支持的选项如下:
Constant | Description |
---|---|
FIRST_COMPLETED | 在当有任何future执行完成(包括已取消)时返回结果 |
FIRST_EXCEPTION | 当有任何future执行引发异常时返回结果,若没有任何future引发异常则等同于ALL_COMPLETED |
ALL_COMPLETED | 当所有future执行完成(包括已取消)时返回结果 |
该函数返回一个包含两个元素的namedtuple,定义如下:
DoneAndNotDoneFutures = collections.namedtuple('DoneAndNotDoneFutures', 'done not_done')
- as_completed(fs, timeout=None)
返回一个迭代器,遍历fs给出的 future 实例(可能由不同的执行器实例创建),在它们执行完成(包含已取消)时 yield future。
经验技巧
正确的使用submit和map
submit 方法返回的是 Future 对象, map方法则返回迭代器,如果没有调用future对象的result方法,即使执行过程中有异常用户也是不知道的,如下所示:
f = lambda x: 100 // x
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future1 = executor.submit(f, 0)
future2 = executor.submit(f, 10)
future3 = executor.submit(f, 20)
print(future1, future2, future3, sep='\n')
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = executor.map(f, [0, 10, 20])
print(futures)
>>>
<Future at 0x3d69fb8 state=finished raised ZeroDivisionError>
<Future at 0x3d85460 state=finished returned int>
<Future at 0x3d856d0 state=finished returned int>
<generator object Executor.map.<locals>.result_iterator at 0x03D59A00>
所以通常需要调用其 result 方法并且捕捉异常:
f = lambda x: 100 // x
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future1 = executor.submit(f, 0)
future2 = executor.submit(f, 0)
future3 = executor.submit(f, 20)
todos = [future1, future2, future3]
for future in concurrent.futures.as_completed(todos):
try:
print(future.result())
except ZeroDivisionError as e:
print(e.__repr__())
>>>
ZeroDivisionError('integer division or modulo by zero')
5
ZeroDivisionError('integer division or modulo by zero')
相对于submit,map方法的结果就比较难获取了,这是因为map方法以异步的方式把函数依次作用在可迭代对象的每个元素上,如果在函数调用时引发了一些异常,当从迭代器检索其值时就将引发异常,因此需要使用下面的方法:
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# If a func call raises an exception, then that exception will be raised when its value is retrieved from the iterator.
futures = executor.map(f, [0, 10, 20])
while True:
try:
future = futures.__next__()
except StopIteration:
break
except ZeroDivisionError as e:
print(e.__repr__())
>>>
ZeroDivisionError('integer division or modulo by zero')
可以看到,当第一次错误发生后生成器迭代就结束了,所以一批任务中可能会出现异常时是不合适用 map 方法的,最好的方式还是使用 submit+as_completed. 在一些较为简单的场景下,如果不需要关心任务的返回值,则可以考虑使用map方法。
寻找合适的max_worker
使用ThreadPoolExecutor,虽然线程的数量可以自定义,但并不是越多越好,因为线程的创建、维护和删除也会有一定的开销。所以如果设置的很大,反而可能会导致速度变慢,比如下面的例子,把线程数从5改为10,运行程序会发现耗时反而增多了。所以在实际开发过程中,往往需要根据实际的需要去做一些测试,在任务不影响到全局的情况下,寻找最优的线程数量。
...
def download_all(urls: list):
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
executor.map(download_one, urls)
...
if __name__ == "__main__":
main()
>>>
...
Download 30 urls in 0.9526623 seconds
max_workers有默认值,等于:min(32, (os.cpu_count() or 1) + 4)
避免死锁
使用ThreadPoolExecutor时可能出现的死锁情况,当与Future 关联的可调用函数等待另一个Future的结果时,它们可能永远不会释放对线程的控制并导致死锁,官网的示例如下:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
在上面的例子中,函数wait_on_b依赖于函数wait_on_a的结果(Future对象的结果),同时后一个函数的结果依赖于前一个函数的结果。因此,上下文管理器中的代码块永远不会执行,因为它具有相互依赖性,这就造成了死锁。
简单使用场景
- 使用多线程从url下载和保存文件
import os
import concurrent.futures
from pathlib import Path
import requests
htmls_dir = Path(__file__).parent.joinpath('htmls')
if not htmls_dir.exists():
os.makedirs(htmls_dir)
else:
for html in htmls_dir.glob("*.html"):
os.remove(html)
def download_one(url):
resp = requests.get(url)
resp.encoding = 'utf-8'
return resp.text
def save(source, html, chunk=8 * 1024):
with open(source, 'w', encoding="utf-8") as fp:
for text in (html[i:chunk + i] for i in range(0, len(html), chunk)):
fp.write(text)
def download_all(urls):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(download_one, url): url[-7:] for url in urls}
for future in concurrent.futures.as_completed(futures):
source = htmls_dir.joinpath(futures[future]).with_suffix('.html')
save(source, future.result())
def main():
urls = [f"https://www.sogou.com/web?query={i}" for i in range(30)]
start_time = time.perf_counter()
download_all(urls)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
print(f'Download {len(urls)} urls in {elapsed_time} seconds')
if __name__ == "__main__":
main()
参考文档
- asyncio --- 异步 I/O 官方文档
- 进程与线程的一个简单解释,阮一峰
- 一文看懂Python多进程与多线程编程
- 使用Python的concurrent.futures轻松实现并发编程
- 使用concurrent.futures的一些经验
- Python并发编程之线程池/进程池--concurrent.futures模块
- 官方文档
- [Book] Python Cook
【To Be Continued...】