一:线程池与进程池所需包
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
二:线程池的基本使用
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
from queue import Queue
import time
q = Queue()
def add_data():
"""生产数据"""
for i in range(5):
for j in range(20):
data = "数据--{}---{}".format(i, j)
q.put(data)
print("【生产数据】{}".format(data))
time.sleep(1)
def handle_data():
"""处理数据"""
while True:
for i in range(4):
try:
data = q.get(timeout=1)
except:
return
else:
print("【处理数据】", data)
q.task_done()
time.sleep(1)
# -----------线程池基本使用-------------
# 创建一个线程池对象,最多四个线程
tpool = ThreadPoolExecutor(max_workers=4)
# 使用一个线程去生产数据
tpool.submit(add_data)
tpool.submit(handle_data)
tpool.submit(handle_data)
tpool.submit(handle_data)
# 等待线程池中所有的任务执行完毕之后,再继续往下执行
tpool.shutdown()
print("-------end----------")
三:线程池上下文管理协议--with
import time
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def work():
for i in range(3):
print("-----{}-------".format(i))
time.sleep(1)
with ThreadPoolExecutor(max_workers = 5) as tp:
for i in range(8):
tp.submit(work)
print("---end----")
四:线程池上下文管理协议--map(与三相同的输出结果)
map进行批量任务提交,map的第一个参数为批量提交的函数,第二个参数为函数的参数
def work(name):
for i in range(3):
print("-----{}-------{}".format(name,i))
time.sleep(1)
with ThreadPoolExecutor(max_workers = 5) as tp:
tp.map(work,[1,2,3,4,5,6,7,8])
print("---end----")
五:带参数的上下文管理协议,submit和map两种方式
def work2(name,age):
for i in range(3):
print("-----{}----{}---{}".format(name,age,i))
time.sleep(1)
# 使用submit
# with ThreadPoolExecutor(max_workers = 5) as tp:
# for i in range(10):
# tp.submit(work2,"musen",i)
# 使用map
with ThreadPoolExecutor(max_workers = 5) as tp:
tp.map(work2,["musen","musen1"],[17,18])
print("---end--- -")
六:进程池的使用
6-1:同一个进程中多个线程之间使用的队列:
import queue
qq = queue.Queue()
6-2:进程之间数据通信的队列multiprocessing.Queue
from multiprocessing import Queue
q1 = Queue()
6-3:进程池之间数据通信
multiprocessing.Manager().Queue
import queue
from multiprocessing import Manager,Queue
from concurrent.futures.process import ProcessPoolExecutor
def work1(q):
for i in range(10):
q.put(i)
def work2(q):
for i in range(10):
print(q.get())
if __name__ == '__main__':
q2 = Manager().Queue()
with ProcessPoolExecutor(max_workers = 2) as pool:
pool.submit(work1,q2)
pool.submit(work2,q2)