写在前面:GIL锁
关于GIL锁:多线程在Python中并不一定是鸡肋
CPython解释器存在GIL锁,一次只允许使用一个线程执行Python代码,但这并不是说多线程就是鸡肋的。
因为,标准库中所有执行耗时任务阻塞型I/O操作的函数,在等待操作系统返回结果时都会释放GIL,这意味着I/O密集型Python程序使用多线程是能提高运行效率的,比如:Python线程在等待网络响应时,阻塞型I/O函数会释放GIL,在运行新的线程。
future模块下的ThreadPoolExecutor
参考文章:http://c.biancheng.net/view/2627.html
Exectuor 提供了如下常用方法:
- submit(fn, *args, kwargs):将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,kwargs 代表以关键字参数的形式为 fn 函数传入参数。
- map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
- shutdown(wait=True):关闭线程池。
使用步骤
使用线程池来执行线程任务的步骤如下:
- 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
- 定义一个普通函数作为线程任务。
- 调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。
- 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
简单线程池示例
from concurrent.futures import ThreadPoolExecutor
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(i)
return my_sum
# 创建一个包含2条线程的线程池
pool = ThreadPoolExecutor(max_workers=2)
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
# 判断future1代表的任务是否结束
print(future1.done())
time.sleep(3)
# 判断future2代表的任务是否结束
print(future2.done())
# 查看future1代表的任务返回的结果
print(future1.result())
# 查看future2代表的任务返回的结果
print(future2.result())
# 关闭线程池
pool.shutdown()
只需使用submit将函数及参数提交给线程池,线程池会自动开启线程来执行函数,并自动管理线程,方便又高效。
结果返回
当程序使用 Future 的 result() 方法来获取异步任务的结果时,该方法会阻塞当前线程,如果没有指定 timeout 参数,当前线程将一直处于阻塞状态,直到 Future 代表的任务返回。
若不想阻塞的方式获取结果,可通过 Future 的add_done_callback()
方法来添加回调函数,。当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。
- 使用非租塞式add_done_callback() 返回结果
- 使用with上下文管理器管理线程池,可避免手动关闭线程池
from concurrent.futures import ThreadPoolExecutor
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
print(i)
return my_sum
# 创建一个包含2条线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
# 向线程池提交一个task, 50会作为action()函数的参数
future1 = pool.submit(action, 50)
# 向线程池再提交一个task, 100会作为action()函数的参数
future2 = pool.submit(action, 100)
def get_result(future):
print(future.result())
# 为future1添加线程完成的回调函数
future1.add_done_callback(get_result)
# 为future2添加线程完成的回调函数
future2.add_done_callback(get_result)
print('--------------')
优雅的管理多线程:线程池
Exectuor 还提供了一个 map(func, *iterables, timeout=None, chunksize=1)
方法,该方法的功能类似于全局函数 map(),区别在于线程池的 map() 方法会为 iterables 的每个元素启动一个线程,以并发方式来执行 func 函数。这种方式相当于启动 len(iterables) 个线程,井收集每个线程的执行结果。
from concurrent.futures import ThreadPoolExecutor
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
my_sum += i
return my_sum
# 创建一个包含4条线程的线程池
with ThreadPoolExecutor(max_workers=4) as pool:
# 使用线程执行map计算
# 后面元组有3个元素,因此程序启动3条线程来执行action函数
results = pool.map(action, (50, 100, 150))
print('--------------')
for r in results:
print(r)
上面程序使用 map() 方法来启动 3 个线程(该程序的线程池包含 4 个线程,如果继续使用只包含两个线程的线程池,此时将有一个任务处于等待状态,必须等其中一个任务完成,线程空闲出来才会获得执行的机会),map() 方法的返回值将会收集每个线程任务的返回结果。
运行上面程序,同样可以看到 3 个线程并发执行的结果,最后通过 results 可以看到 3 个线程任务的返回结果。
通过上面程序可以看出,使用 map() 方法来启动线程,并收集线程的执行结果,不仅具有代码简单的优点,而且虽然程序会以并发方式来执行 action() 函数,但最后收集的 action() 函数的执行结果,依然与传入参数的结果保持一致,即函数返回结果顺序与调用顺序一致。
map方法还有一个特性:如果第一个调用生成器结果用时10秒,而其他调用只用一秒,代码会阻塞10秒,用以获取生成器的第一个结果。在此之后,获取后续结果时不会阻塞,因为后续的调用已经结束。
使用as_completed()绝不阻塞
如果不想要任何阻塞,也不需要调用顺序,只管获取到最终结果即可时,我们可以使用Excutor.submit 和 future.as_completed函数结合使用。
from concurrent.futures import ThreadPoolExecutor
import time
# 定义一个准备作为线程任务的函数
def action(max):
my_sum = 0
for i in range(max):
my_sum += i
return my_sum
cc_list = [50, 100, 150]
with ThreadPoolExecutor(max_workers=4) as pool:
to_do_map = []
for cc in cc_list:
# 创建一个Future实例
future = pool.submit(action, cc)
to_do_map.append(future)
result = []
done_iter = future.as_complete(to_do_map)
for future in done_iter:
# 这里获取future.result()的方式绝不会阻塞
res = future.result()
result.append(res)
print(result)
将原有的map方式改为使用两个for循环调用submit和as_completed方式,使用future.result()绝不会阻塞。