1.18python之多线程

一、使用threading模块实现线程的创建

实例1

import threading
from time import ctime, sleep


def target():
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
    sleep(5)
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))


if __name__ == "__main__":
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
    t = threading.Thread(target=target)
    t.start()
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))

输出结果:

The current threading MainThread---Wed Oct 14 18:32:19 2020 is running
The current threading Thread-1---Wed Oct 14 18:32:19 2020 is running
The current threading MainThread---Wed Oct 14 18:32:19 2020 is running
The current threading Thread-1---Wed Oct 14 18:32:24 2020 is running

Process finished with exit code 0

import threading
首先导入threading 模块,这是使用多线程的前提。
t = threading.Thread(target=target)
创建线程t,使用threading.Thread()方法。
t.start()
开始线程活动。
使用threading.current_thread()可以查看到当前线程的信息。
从输出结果可以看到在线程Thread-1结束前MainThread已经结束了,但并没有杀死子线程Thread-1。

实例2

import threading
from time import ctime, sleep


def target():
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
    sleep(5)
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))


if __name__ == "__main__":
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
    t = threading.Thread(target=target)
    t.start()
    t.join()
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))

t.join()
join()的作用是,在子线程完成运行之前,这个子线程的父线程将一直被阻塞。Python中,默认情况下,如果不加join()语句,那么主线程不会等到当前线程结束才结束,但却不会立即杀死该线程。如上面的输出结果所示。
输出结果:

The current threading MainThread---Wed Oct 14 18:40:42 2020 is running
The current threading Thread-1---Wed Oct 14 18:40:42 2020 is running
The current threading Thread-1---Wed Oct 14 18:40:47 2020 is running
The current threading MainThread---Wed Oct 14 18:40:47 2020 is running

Process finished with exit code 0

实例3

import threading
from time import ctime, sleep


def target():
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
    sleep(5)
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))


if __name__ == "__main__":
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
    t = threading.Thread(target=target)
    t.setDaemon(True)
    t.start()
    t.join()
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))

输出结果:

The current threading MainThread---Thu Oct 15 13:07:04 2020 is running
The current threading Thread-1---Thu Oct 15 13:07:04 2020 is running
The current threading Thread-1---Thu Oct 15 13:07:09 2020 is running
The current threading MainThread---Thu Oct 15 13:07:09 2020 is running

Process finished with exit code 0

t.setDaemon(True)
t.setDaemon(True)将线程声明为守护线程,必须在start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。如果当前python线程是守护线程,那么意味着这个线程是“不重要”的,“不重要”意味着如果他的主进程结束了但该守护线程没有运行完,守护进程就会被强制结束。如果线程是非守护线程,那么父进程只有等到守护线程运行完毕后才能结束

import threading
from time import ctime, sleep


def target():
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
    sleep(5)
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))


if __name__ == "__main__":
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
    t = threading.Thread(target=target)
    t.setDaemon(True)
    t.start()
    # t.join()
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))

输出结果:

The current threading MainThread---Thu Oct 15 13:08:38 2020 is running
The current threading Thread-1---Thu Oct 15 13:08:38 2020 is running
The current threading MainThread---Thu Oct 15 13:08:38 2020 is running

Process finished with exit code 0

如果为线程实例添加t.setDaemon(True)之后,如果不加join语句,那么当主线程结束之后,会杀死子线程。

二、使用threading模块实现多线程的创建

1、函数的方式创建

import threading
from time import ctime, sleep


def code():
    print("I'm coding. {}---{}".format(ctime(), threading.current_thread().name))
    sleep(5)


def draw():
    print("I'm drawing. {}---{}".format(ctime(), threading.current_thread().name))
    sleep(5)


if __name__ == "__main__":
    threads = []
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
    t1 = threading.Thread(target=code)
    threads.append(t1)
    t2 = threading.Thread(target=draw)
    threads.append(t2)
    for t in threads:
        t.setDaemon(True)
        t.start()
    t.join()
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))

输出结果:

The current threading MainThread---Thu Oct 15 13:35:06 2020 is running
I'm coding. Thu Oct 15 13:35:06 2020---Thread-1
I'm drawing. Thu Oct 15 13:35:06 2020---Thread-2
The current threading MainThread---Thu Oct 15 13:35:11 2020 is running

Process finished with exit code 0

给线程传递参数

import threading
from time import ctime, sleep


def code(arg):
    print("I'm coding.{}---{}---{}".format(arg, ctime(), threading.current_thread().name))
    sleep(5)


def draw(arg):
    print("I'm drawing.{}----{}---{}".format(arg, ctime(), threading.current_thread().name))
    sleep(5)


if __name__ == "__main__":
    threads = []
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
    t1 = threading.Thread(target=code, args=('敲代码',))
    threads.append(t1)
    t2 = threading.Thread(target=draw, args=('画画',))
    threads.append(t2)
    for t in threads:
        t.setDaemon(True)
        t.start()
    t.join()
    print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))

输出结果:

The current threading MainThread---Thu Oct 15 13:39:49 2020 is running
I'm coding.敲代码---Thu Oct 15 13:39:49 2020---Thread-1
I'm drawing.画画----Thu Oct 15 13:39:49 2020---Thread-2
The current threading MainThread---Thu Oct 15 13:39:54 2020 is running

Process finished with exit code 0

2.类的方式创建线程

继承自threading.Thread类
为了让线程代码更好的封装,可以使用threading模块的下的Thread类,继承自这个类,然后实现run方法,线程就会自动运行run方法中的代码。

import threading
from time import ctime, sleep


class CodingThread(threading.Thread):
    def run(self):
        print("I'm coding.{}---{}".format(ctime(), threading.current_thread().name))
        sleep(5)


class DrawingThread(threading.Thread):
    def run(self):
        print("I'm drawing.{}---{}".format(ctime(), threading.current_thread().name))
        sleep(5)


def multi_thread():
    t1 = CodingThread()
    t2 = DrawingThread()

    print(threading.enumerate())
    t1.start()
    print(threading.enumerate())
    t2.start()
    print(threading.enumerate())


if __name__ == "__main__":
    multi_thread()

输出结果

[<_MainThread(MainThread, started 4403457344)>]
I'm coding.Thu Oct 15 13:45:06 2020---Thread-1
[<_MainThread(MainThread, started 4403457344)>, <CodingThread(Thread-1, started 123145444630528)>]
I'm drawing.Thu Oct 15 13:45:06 2020---Thread-2
[<_MainThread(MainThread, started 4403457344)>, <CodingThread(Thread-1, started 123145444630528)>, <DrawingThread(Thread-2, started 123145461420032)>]

Process finished with exit code 0

三、多线程共享全局变量以及锁机制

1、多线程共享变量的问题

对于多线程来说,最大的特点就是线程之间可以共享数据,线程的执行又是无序的,那么共享数据就会出现多线程同时更改一个变量,使用同样的资源,而出现死锁、数据错乱等情况。

import threading


value = 0


class AddValueThread(threading.Thread):
    def run(self):
        global value
        for x in range(1000000):
            value += 1
        print("{}的值是{}".format(threading.current_thread().name, value))


def multi_thread():
    for i in range(2):
        t = AddValueThread()
        t.start()


if __name__ == "__main__":
    multi_thread()

输出结果:

Thread-1的值是1214452
Thread-2的值是1393110

Process finished with exit code 0

这个结果是错误的,正确的结果应该是:

Thread-1的值是1000000
Thread-2的值是2000000

由于两条线程同时对value操作,所以这里就出现数据错误了

2、线程锁和ThreadLocal

(1)线程锁

为了解决以上使用共享变量的问题。threading提供了一个Lock类,这个类可以在某个线程访问某个变量的时候加锁,其他线程就进不来,直到当前进程处理完成后,释放了锁,其他线程才能进来进行处理。当访问某个资源之前,用Lock.acquire()锁住资源,访问之后,用Lock.release()释放资源。

import threading

value = 0
gLock = threading.Lock()


class AddValueThread(threading.Thread):
    def run(self):
        global value
        gLock.acquire()
        for x in range(1000000):
            value += 1
        gLock.release()
        print("{}的值是{}".format(threading.current_thread().name, value))


def multi_thread():
    for i in range(2):
        t = AddValueThread()
        t.start()


if __name__ == "__main__":
    multi_thread()

输出结果:

Thread-1的值是1000000
Thread-2的值是2000000

Process finished with exit code 0
(2)、ThreadLocal

介绍完线程锁,接下来出场的是ThreadLocal。当不想将变量共享给其他线程时,可以使用局部变量,但在函数中定义局部变量会使得在函数之间传递特别麻烦。ThreadLocal是非常牛逼的东西,它解决了全局变量需要枷锁,局部变量传递麻烦的两个问题。通过在线程中定义:
local_school = threading.local()
此时这个local_school就变成了一个全局变量,但这个全局变量只在该线程中为全局变量,对于其他线程来说是局部变量,别的线程不可更改。

def process_thread(name): # 绑定ThreadLocal的student: 
             local_school.student = name

这个student属性只有本线程可以修改,别的线程不可以。代码:

import threading

value = 0
gLocal = threading.local()


class AddValueThread(threading.Thread):
    def run(self):
        gLocal.value = 0
        for x in range(1000000):
            gLocal.value += 1
        print("{}的值是{}".format(threading.current_thread().name, gLocal.value))


def multi_thread():
    for i in range(2):
        t = AddValueThread()
        t.start()


if __name__ == "__main__":
    multi_thread()

输出结果:

Thread-1的值是1000000
Thread-2的值是1000000

Process finished with exit code 0

四、生产者和消费者模式

(1)、Lock版

生产者线程专门用来生产一些数据,然后存放到中间变量中,消费者再从中间的变量中取出数据进行消费。中间变量经常是一些全局变量,所以需要使用锁来保证数据完整性。

import threading
import random
import time

gMoney = 1000
gTimes = 0
gLock = threading.Lock()


class Producer(threading.Thread):
    def run(self):
        global gMoney
        global gTimes
        while True:
            money = random.randint(100, 1000)
            gLock.acquire()
            if gTimes >= 3:
                gLock.release()
                break
            gMoney += money
            print("{}当前存入{}元钱,剩余{}元钱".format(threading.current_thread(), money, gMoney))
            gTimes += 1
            time.sleep(0.5)
            gLock.release()


class Consumer(threading.Thread):
    def run(self):
        global gMoney
        global gTimes
        while True:
            money = random.randint(100, 500)
            gLock.acquire()
            if gMoney > money:
                gMoney -= money
                print("{}当前取出{}元钱,剩余{}元钱".format(threading.current_thread(), money, gMoney))
                time.sleep(0.5)
            else:
                if gTimes >= 3:
                    gLock.release()
                    break
                print("{}当前想取出{}元钱,剩余{}元钱,不足!".format(threading.current_thread(), money, gMoney))
            gLock.release()


def multi_thread():
    for i in range(2):
        Consumer(name="消费者线程{}".format(i)).start()
    for j in range(2):
        Producer(name="生产者线程{}".format(j)).start()


if __name__ == "__main__":
    multi_thread()

输出结果:

<Consumer(消费者线程0, started 123145324752896)>当前取出128元钱,剩余872元钱
<Consumer(消费者线程1, started 123145341542400)>当前取出420元钱,剩余452元钱
<Producer(生产者线程0, started 123145358331904)>当前存入997元钱,剩余1449元钱
<Producer(生产者线程1, started 123145375121408)>当前存入700元钱,剩余2149元钱
<Producer(生产者线程1, started 123145375121408)>当前存入984元钱,剩余3133元钱
<Consumer(消费者线程1, started 123145341542400)>当前取出221元钱,剩余2912元钱
<Consumer(消费者线程0, started 123145324752896)>当前取出313元钱,剩余2599元钱
<Consumer(消费者线程1, started 123145341542400)>当前取出189元钱,剩余2410元钱
<Consumer(消费者线程0, started 123145324752896)>当前取出356元钱,剩余2054元钱
<Consumer(消费者线程1, started 123145341542400)>当前取出109元钱,剩余1945元钱
<Consumer(消费者线程0, started 123145324752896)>当前取出418元钱,剩余1527元钱
<Consumer(消费者线程1, started 123145341542400)>当前取出381元钱,剩余1146元钱
<Consumer(消费者线程0, started 123145324752896)>当前取出416元钱,剩余730元钱
<Consumer(消费者线程0, started 123145324752896)>当前取出166元钱,剩余564元钱
<Consumer(消费者线程0, started 123145324752896)>当前取出111元钱,剩余453元钱
<Consumer(消费者线程1, started 123145341542400)>当前取出384元钱,剩余69元钱
<Consumer(消费者线程0, started 123145324752896)>当前想取出415元钱,剩余69元钱,不足!
<Consumer(消费者线程1, started 123145341542400)>当前想取出100元钱,剩余69元钱,不足!

Process finished with exit code 0

(2)、Condition版

LOCK版本的生产者和消费者存在一个不足,在消费者中总是通过while True死循环并且上锁的方式判断资源够不够。上锁是一个很耗费cpu资源的行为。因此这种方式不是最好的。还有一种更好的方式是使用threading.Condition来实现。threading.Condition消费者可以在没有数据的时候处于阻塞等待状态。生产者一旦有合适的数据,还可以使用notify相关的函数来通知处于等待阻塞状态的线程。这样就可以避免一些无用的上锁、解锁的操作。
threading.Condition类似threading.Lock,可以在修改全局数据的时候进行上锁,也可以在修改完毕后进行解锁。
acquire:上锁
release:解锁
wait:将当前线程处于等待状态,并且会释放锁。可以被其他线程使用notify和notify_all函数唤醒。被唤醒后继续等待上锁,上锁后继续执行下面的代码。
notify:通知某个正在等待的线程,默认是第1个等待的线程。
notify_all:通知所有正在等待的线程。
注意: notify和notify_all不会释放锁。并且需要在release之前调用。

import threading
import random
import time

gMoney = 1000
gCondition = threading.Condition()  # 锁
gTimes = 0


class Producer(threading.Thread):
    def run(self):
        global gMoney
        global gTimes
        while True:
            money = random.randint(100, 1000)
            gCondition.acquire()
            if gTimes >= 3:
                gCondition.release()
                break
            gMoney += money
            print("{}当前存入{}元钱,剩余{}元钱".format(threading.current_thread(), money, gMoney))
            gTimes += 1
            gCondition.notify_all()
            gCondition.release()
            time.sleep(0.5)


class Consumer(threading.Thread):
    def run(self):
        global gMoney
        global gTimes
        while True:
            money = random.randint(100, 500)
            gCondition.acquire()
            while gMoney < money:
                print("{}准备消费{}元钱,还剩{}元钱,余额不足!".format(threading.current_thread(), money, gMoney))
                if gTimes >= 3:
                    gCondition.release()
                    return
                gCondition.wait()
            gMoney -= money
            print("{}消费了{}元钱,剩余{}元钱".format(threading.current_thread(), money, gMoney))
            gCondition.release()
            time.sleep(0.5)


def multi_thread():
    for i in range(2):
        Consumer(name="消费者线程{}".format(i)).start()
    for j in range(2):
        Producer(name="生产者线程{}".format(j)).start()


if __name__ == "__main__":
    multi_thread()

输出结果:

<Consumer(消费者线程0, started 123145357996032)>消费了273元钱,剩余727元钱
<Consumer(消费者线程1, started 123145374785536)>消费了470元钱,剩余257元钱
<Producer(生产者线程0, started 123145391575040)>当前存入181元钱,剩余438元钱
<Producer(生产者线程1, started 123145408364544)>当前存入464元钱,剩余902元钱
<Consumer(消费者线程0, started 123145357996032)>消费了455元钱,剩余447元钱
<Producer(生产者线程0, started 123145391575040)>当前存入677元钱,剩余1124元钱
<Consumer(消费者线程1, started 123145374785536)>消费了400元钱,剩余724元钱
<Consumer(消费者线程0, started 123145357996032)>消费了485元钱,剩余239元钱
<Consumer(消费者线程1, started 123145374785536)>消费了159元钱,剩余80元钱
<Consumer(消费者线程0, started 123145357996032)>准备消费325元钱,还剩80元钱,余额不足!
<Consumer(消费者线程1, started 123145374785536)>准备消费229元钱,还剩80元钱,余额不足!

Process finished with exit code 0

五、线程池

1.什么是线程池

引言:诸如web服务器、数据库服务器、文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务。构建服务器应用程序的一个过于简单的模型是:每当一个请求到达就创建一个新的服务对象,然后在新的服务对象中为请求服务。但当有大量请求并发访问时,服务器不断的创建和销毁对象的开销很大。
所以提高服务器效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这样就引入了“池”的概念,
“池”的概念使得人们可以定制一定量的资源,然后对这些资源进行反复的使用用,而不是频繁的创建和销毁这些资源。
定义:线程池是预先创建线程的一种技术。这些线程都是处于睡眠状态,即均为启动,不消耗CPU,而只是占用较小的内存空间。当请求到来之后,缓冲池给这次请求分配一个空闲线程,把请求传入此线程中运行,进行处理。当预先创建的线程都处于运行状态,即预制线程不够,线程池可以自由创建一定数量的新线程,用于处理更多的请求。当系统比较闲的时候,也可以通过移除一部分一直处于停用状态的线程。

2. Python的concurrent.futures 线程池进程池模块

python3.2加入了concurrent.futures模块,实现了线程池和进程池。这个主要有两种类型:执行器(executor)和任务容器(Future)。执行器(executor)用来管理工作线程和进程池,任务容器(Feature)直译是未来对象,换句话说,就是将我们的任务(函数)进行一层包裹,封装为未来对象。简单理解就是可以把Future看成是任务的一个容器,除了能够销毁任务,里面还包含了任务的执行状态。

2.1创建一个Future对象

我们先手动创建一个Future对象,分析一下:

from concurrent.futures import Future

# 创建一个Future对象
future = Future()


# 定义callback函数
def callback(future):
    print(f"回调函数执行,结果是:{future.result()}")


def test_future():
    # 在Future对象中有一个add_done_callback方法,可以将future绑定一个回调函数,在调用add_done_callback的时候只需要传入函数名,future会自动传递给callback的第一个参数。
    print('添加回调函数')
    future.add_done_callback(callback)

    # 当future执行set_result的时候,执行回调
    print("触发回调函数")
    future.set_result("哈哈,想不到吧,我就是结果")


if __name__ == '__main__':
    test_future()

值得注意的是:可以多次set_result,但是后面的会覆盖前面的,并且result()获取可以获取多次。

2.2通过提交任务创建一个Future对象
2.2.1使用submit提交任务

submit。提交任务,并返回 Future 对象代表可调用对象的执行。

from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time


# 创建单个任务
def threadTask(taskNum):
    threadId = threading.currentThread().getName()
    period = f"任务{taskNum}:线程id----{threadId},进程id----{os.getgid()}"
    print(period)
    time.sleep(3) # 子线程休眠
    return period


# 封装线程池函数
def localThreadPool():
    # max_workers参数,表示最多创建多少个线程
    # 如果不指定,那么每一个任务都会为其创建一个线程
    executor = ThreadPoolExecutor(max_workers=3)

    # 通过submit就直接将任务提交到线程池里面了,一旦提交,就会立刻运行
    # 提交之后,相当于开启了一个新的线程,主线程会继续往下走
    future = executor.submit(threadTask, 1)
    print(f'线程状态:{future},运行结果:{future.result()}')
    time.sleep(5)  # 主线程休眠
    print(f'线程状态:{future},运行结果:{future.result()}')


if __name__ == "__main__":
    localThreadPool()

运行结果如下:

任务1:线程id----ThreadPoolExecutor-0_0,进程id----20
线程状态:<Future at 0x10b346850 state=running>,运行结果:任务1:线程id----ThreadPoolExecutor-0_0,进程id----20
线程状态:<Future at 0x10b346850 state=finished returned str>,运行结果:任务1:线程id----ThreadPoolExecutor-0_0,进程id----20

Process finished with exit code 0

我们可以同时提交多个任务

from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time


# 创建单个任务
def threadTask(taskNum):
    threadId = threading.current_thread().getName()
    period = f"任务{taskNum}:线程id----{threadId},进程id----{os.getgid()}"
    print(period)
    time.sleep(3)  # 子线程休眠
    return period


# 定义回调函数
def callBack(future):
    print(f"我是回调函数:线程状态{future}")


# 封装线程池函数
def localThreadPool():
    # max_workers参数,表示最多创建多少个线程
    # 如果不指定,那么每一个任务都会为其创建一个线程
    executor = ThreadPoolExecutor(max_workers=3)
    futures = [executor.submit(threadTask, i) for i in range(5)]  # 提交多个任务
    # 通过submit就直接将任务提交到线程池里面了,一旦提交,就会立刻运行
    # 提交之后,相当于开启了一个新的线程,主线程会继续往下走
    print(f'线程状态:{futures}')
    time.sleep(5)  # 主线程休眠
    print(f'线程状态:{futures}')
    time.sleep(5)  # 主线程休眠
    print(f'线程状态:{futures}')


if __name__ == "__main__":
    localThreadPool()

运行结果:

任务0:线程id----ThreadPoolExecutor-0_0,进程id----20
任务1:线程id----ThreadPoolExecutor-0_1,进程id----20
任务2:线程id----ThreadPoolExecutor-0_2,进程id----20
# 因为我们的max_workers=3,所以同时先启了三条线程
线程状态:[<Future at 0x10347fd00 state=running>, <Future at 0x1034a4be0 state=running>, <Future at 0x1034a4f70 state=running>, <Future at 0x1034ac370 state=pending>, <Future at 0x1034ac490 state=pending>]
# 我们可以看到前三条线程已经启动(running),后面两条是待定(pending)
任务3:线程id----ThreadPoolExecutor-0_0,进程id----20
任务4:线程id----ThreadPoolExecutor-0_1,进程id----20
#从线程池中取出两条线程执行任务3和任务4
线程状态:[<Future at 0x10347fd00 state=finished returned str>, <Future at 0x1034a4be0 state=finished returned str>, <Future at 0x1034a4f70 state=finished returned str>, <Future at 0x1034ac370 state=running>, <Future at 0x1034ac490 state=running>]
#我们可以看到当前三条任务已经完成(finished),后面两条启动(running)
线程状态:[<Future at 0x10347fd00 state=finished returned str>, <Future at 0x1034a4be0 state=finished returned str>, <Future at 0x1034a4f70 state=finished returned str>, <Future at 0x1034ac370 state=finished returned str>, <Future at 0x1034ac490 state=finished returned str>]
#所有任务运行结束

此外我们可以使用future.running()和future.done()来判断当前任务是否执行完,这里不做演示了。

2.2.2使用map提交任务

map。和 Python 自带的 map 函数功能类似,只不过是以异步的方式把函数依次作用在列表的每个元素上。

from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time


# 创建单个任务
def threadTask(taskNum):
    threadId = threading.current_thread().getName()
    period = f"任务{taskNum}:线程id----{threadId},进程id----{os.getgid()}"
    print(period)
    time.sleep(3)  # 子线程休眠
    return period


# 定义回调函数
def callBack(future):
    print(f"我是回调函数:线程状态{future}")


# 封装线程池函数
def localThreadPool():
    # max_workers参数,表示最多创建多少个线程
    # 如果不指定,那么每一个任务都会为其创建一个线程
    executor = ThreadPoolExecutor(max_workers=3)
    futures = executor.map(threadTask, [i for i in range(5)])   # 提交多个任务,注意这里与submit的区别
    # 通过submit就直接将任务提交到线程池里面了,一旦提交,就会立刻运行
    # 提交之后,相当于开启了一个新的线程,主线程会继续往下走
    print(f'线程状态:{futures}')
    time.sleep(5)  # 主线程休眠
    print(f'线程状态:{futures}')
    time.sleep(5)  # 主线程休眠
    print(f'线程状态:{futures}')


if __name__ == "__main__":
    localThreadPool()
2.3重要函数

模块下有 2 个重要函数wait和as_completed。用来等待所有任务完成。

2.3.1 wait

wait用来等待指定的Future实例完成,它和asyncio.wait意图很像,返回值有 2 项,第一项表示完成的任务列表 (done),第二项表示为未完成的任务列表 (not_done):

from concurrent.futures import ThreadPoolExecutor, wait
import threading
import os
import time


# 创建单个任务
def threadTask(taskNum):
    threadId = threading.current_thread().getName()
    period = f"任务{taskNum}:线程id----{threadId},进程id----{os.getgid()}"
    print(period)
    time.sleep(3)  # 子线程休眠
    return period


# 定义回调函数
def callBack(future):
    print(f"我是回调函数:线程状态{future}")


# 封装线程池函数
def localThreadPool():
    # max_workers参数,表示最多创建多少个线程
    # 如果不指定,那么每一个任务都会为其创建一个线程
    executor = ThreadPoolExecutor(max_workers=3)
    futures = [executor.submit(threadTask, i) for i in range(5)]  # 提交多个任务
    # 通过submit就直接将任务提交到线程池里面了,一旦提交,就会立刻运行
    # 提交之后,相当于开启了一个新的线程,主线程会继续往下走
    print("没有阻塞,我还可以输出")
    fs = wait(futures)
    print(fs)
    print("任务跑完了,我才能被放出来")
if __name__ == "__main__":
    localThreadPool()

输出结果如下:

任务0:线程id----ThreadPoolExecutor-0_0,进程id----20
任务1:线程id----ThreadPoolExecutor-0_1,进程id----20
任务2:线程id----ThreadPoolExecutor-0_2,进程id----20
没有阻塞,我还可以输出
任务3:线程id----ThreadPoolExecutor-0_2,进程id----20
任务4:线程id----ThreadPoolExecutor-0_1,进程id----20
DoneAndNotDoneFutures(done={<Future at 0x1104e2c40 state=finished returned str>, <Future at 0x11050f2b0 state=finished returned str>, <Future at 0x110507eb0 state=finished returned str>, <Future at 0x110507b20 state=finished returned str>, <Future at 0x11050f3d0 state=finished returned str>}, not_done=set())
任务跑完了,我才能被放出来

Process finished with exit code 0
2.3.2 as_completed

as_completed函数返回一个包含指定的 Future 实例的迭代器,这些实例会在完成时被 yield 出来:

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import os
import time


# 创建单个任务
def threadTask(taskNum):
    threadId = threading.current_thread().getName()
    period = f"任务{taskNum}:线程id----{threadId},进程id----{os.getgid()}"
    print(period)
    time.sleep(3)  # 子线程休眠
    return period


# 定义回调函数
def callBack(future):
    print(f"我是回调函数:线程状态{future}")


# 封装线程池函数
def localThreadPool():
    # max_workers参数,表示最多创建多少个线程
    # 如果不指定,那么每一个任务都会为其创建一个线程
    executor = ThreadPoolExecutor(max_workers=3)
    futures = [executor.submit(threadTask, i) for i in range(5)]  # 提交多个任务
    # 通过submit就直接将任务提交到线程池里面了,一旦提交,就会立刻运行
    # 提交之后,相当于开启了一个新的线程,主线程会继续往下走
    for future in as_completed(futures):
        print(future)


if __name__ == "__main__":
    localThreadPool()

运行结果:

任务0:线程id----ThreadPoolExecutor-0_0,进程id----20
任务1:线程id----ThreadPoolExecutor-0_1,进程id----20
任务2:线程id----ThreadPoolExecutor-0_2,进程id----20
#上面的是先输出的内容
任务3:线程id----ThreadPoolExecutor-0_0,进程id----20
任务4:线程id----ThreadPoolExecutor-0_2,进程id----20
<Future at 0x110540c40 state=finished returned str>
<Future at 0x110565eb0 state=finished returned str>
<Future at 0x110565b20 state=finished returned str>
# 当前三个任务完成后就有上面的输出了,最后才是下面的输出
<Future at 0x11056d3d0 state=finished returned str>
<Future at 0x11056d2b0 state=finished returned str>

注意:as_completed只能用于多个submit组成的列表,不能和map一起使用。

2.3.3等待任务完成另外两种方法

方法1:调用executor的shutdown

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import os
import time


# 创建单个任务
def threadTask(taskNum):
    threadId = threading.current_thread().getName()
    period = f"任务{taskNum}:线程id----{threadId},进程id----{os.getgid()}"
    print(period)
    time.sleep(3)  # 子线程休眠
    return period


# 定义回调函数
def callBack(future):
    print(f"我是回调函数:线程状态{future}")


# 封装线程池函数
def localThreadPool():
    # max_workers参数,表示最多创建多少个线程
    # 如果不指定,那么每一个任务都会为其创建一个线程
    executor = ThreadPoolExecutor(max_workers=3)
    futures = [executor.submit(threadTask, i) for i in range(5)]  # 提交多个任务
    # 通过submit就直接将任务提交到线程池里面了,一旦提交,就会立刻运行
    # 提交之后,相当于开启了一个新的线程,主线程会继续往下走
    print("没有阻塞,我还可以输出")
    executor.shutdown()
    print("任务跑完了,我才能被放出来")


if __name__ == "__main__":
    localThreadPool()

输出结果如下:

任务0:线程id----ThreadPoolExecutor-0_0,进程id----20任务1:线程id----ThreadPoolExecutor-0_1,进程id----20
任务2:线程id----ThreadPoolExecutor-0_2,进程id----20
没有阻塞,我还可以输出

任务3:线程id----ThreadPoolExecutor-0_1,进程id----20
任务4:线程id----ThreadPoolExecutor-0_2,进程id----20
任务跑完了,我才能被放出来

Process finished with exit code 0

方法2:使用上下文管理

from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time


# 创建单个任务
def threadTask(taskNum):
    threadId = threading.current_thread().getName()
    period = f"任务{taskNum}:线程id----{threadId},进程id----{os.getgid()}"
    print(period)
    time.sleep(3)  # 子线程休眠
    return period


# 定义回调函数
def callBack(future):
    print(f"我是回调函数:线程状态{future}")


# 封装线程池函数
def localThreadPool():
    # max_workers参数,表示最多创建多少个线程
    # 如果不指定,那么每一个任务都会为其创建一个线程
    with ThreadPoolExecutor(max_workers=3) as executor:
        print("没有阻塞,我还可以输出")
        futures = [executor.submit(threadTask, i) for i in range(5)]  # 提交多个任务
    # 通过submit就直接将任务提交到线程池里面了,一旦提交,就会立刻运行
    # 提交之后,相当于开启了一个新的线程,主线程会继续往下走
    print("任务跑完了,我才能被放出来")
    for future in futures:
        print(future)


if __name__ == "__main__":
    localThreadPool()

输出结果如下:

没有阻塞,我还可以输出
任务0:线程id----ThreadPoolExecutor-0_0,进程id----20
任务1:线程id----ThreadPoolExecutor-0_1,进程id----20
任务2:线程id----ThreadPoolExecutor-0_2,进程id----20
任务3:线程id----ThreadPoolExecutor-0_2,进程id----20
任务4:线程id----ThreadPoolExecutor-0_1,进程id----20
任务跑完了,我才能被放出来
<Future at 0x10ff82d00 state=finished returned str>
<Future at 0x10ffa7be0 state=finished returned str>
<Future at 0x10ffa7f70 state=finished returned str>
<Future at 0x10ffb0340 state=finished returned str>
<Future at 0x10ffb0460 state=finished returned str>
2.4加入异常处理
2.4.1 submit方式提交的任务处理异常
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
import random


# 创建单个任务
def threadTask(taskNum):
    num = random.randint(0, 2)/taskNum
    threadId = threading.current_thread().getName()
    period = f"任务{taskNum}:线程id----{threadId},进程id----{os.getgid()}"
    print(period)
    time.sleep(3)  # 子线程休眠
    return period


# 定义回调函数
def callBack(future):
    print(f"我是回调函数:线程状态{future}")


# 封装线程池函数
def localThreadPool():
    # max_workers参数,表示最多创建多少个线程
    # 如果不指定,那么每一个任务都会为其创建一个线程
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(threadTask, i) for i in range(5)]  # 提交多个任务
    # 通过submit就直接将任务提交到线程池里面了,一旦提交,就会立刻运行
    # 提交之后,相当于开启了一个新的线程,主线程会继续往下走
    for future in futures:
        try:
            future.result()
        except Exception as exc:
            print(f'{future},Generated an exception: {exc}')


if __name__ == "__main__":
    localThreadPool()

运行结果如下:

任务1:线程id----ThreadPoolExecutor-0_1,进程id----20
任务2:线程id----ThreadPoolExecutor-0_0,进程id----20
任务3:线程id----ThreadPoolExecutor-0_2,进程id----20
任务4:线程id----ThreadPoolExecutor-0_0,进程id----20
<Future at 0x10ba96af0 state=finished raised ZeroDivisionError>,Generated an exception: division by zero

Process finished with exit code 0
2.4.2 map方式提交的任务异常处理
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
import random


# 创建单个任务
def threadTask(taskNum):
    num = random.randint(0, 2) / taskNum
    threadId = threading.current_thread().getName()
    period = f"任务{taskNum}:线程id----{threadId},进程id----{os.getgid()}"
    print(period)
    time.sleep(3)  # 子线程休眠
    return period


# 定义回调函数
def callBack(future):
    print(f"我是回调函数:线程状态{future}")


# 封装线程池函数
def localThreadPool():
    # max_workers参数,表示最多创建多少个线程
    # 如果不指定,那么每一个任务都会为其创建一个线程
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = executor.map(threadTask, [i for i in range(5)])  # 提交多个任务
    # 通过submit就直接将任务提交到线程池里面了,一旦提交,就会立刻运行
    # 提交之后,相当于开启了一个新的线程,主线程会继续往下走
    while 1:
        try:
            print(next(futures))
        except StopIteration:
            break
        except Exception as exc:
            print(f'Generated an exception: {exc}')


if __name__ == "__main__":
    localThreadPool()

输出结果如下:

任务1:线程id----ThreadPoolExecutor-0_0,进程id----20
任务2:线程id----ThreadPoolExecutor-0_1,进程id----20

任务3:线程id----ThreadPoolExecutor-0_2,进程id----20
任务4:线程id----ThreadPoolExecutor-0_1,进程id----20
Generated an exception: division by zero

Process finished with exit code 0

如果我们采用submit的异常处理方法
输出结果如下:

任务1:线程id----ThreadPoolExecutor-0_0,进程id----20任务2:线程id----ThreadPoolExecutor-0_1,进程id----20

任务3:线程id----ThreadPoolExecutor-0_2,进程id----20
任务4:线程id----ThreadPoolExecutor-0_2,进程id----20
Traceback (most recent call last):
  File "/Users/xianchengchi.py", line 40, in <module>
    localThreadPool()
  File "/Users/xianchengchi.py", line 31, in localThreadPool
    for future in futures:
  File "/usr/local/Cellar/python@3.9/3.9.1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 600, in result_iterator
    yield fs.pop().result()
  File "/usr/local/Cellar/python@3.9/3.9.1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 433, in result
    return self.__get_result()
  File "/usr/local/Cellar/python@3.9/3.9.1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/usr/local/Cellar/python@3.9/3.9.1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/thread.py", line 52, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/xianchengchi.py", line 10, in threadTask
    num = random.randint(0, 2)/taskNum
ZeroDivisionError: division by zero

Process finished with exit code 1

可以看到第一次错误发生后生成器就结束了,所以一批任务中可能会出现异常是不合适用map的,因为list(rs)或者对结果做循环是会由于某个任务抛错而获得不了后面的那些任务结果,最好的方式还是submit + as_completed。
最后:善用 max_workers
ProcessPoolExecutor和ThreadPoolExecutor都接受max_workers参数,表示用来执行任务的进程 / 线程数量。ProcessPoolExecutor 的默认值是 CPU 的个数 (通过 < code>os.cpu_count () 获得),而 ThreadPoolExecutor 的默认值是 CPU 的个数的 5 倍!
对于初学者或者通常情况下是不需要手动设置max_workers参数,默认值是可以足够好的工作的。但是:

  • 根据不同的业务场景,提高 max_workers 可以加快任务完成。不过要注意,不是值越高越高,超过一定阈值会起到反作用。尤其是在 IO 密集型的任务上使用 ThreadPoolExecutor,不同的 max_workers 差别会很大,但是影响网络问题因素太多,我这里就不举例了。
  • 有时候服务器上跑了很多重要服务,不希望某个任务影响到全局,还可以按需把 max_workers 的值设置成小于默认值。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。