Python的多线程

一、线程

在一个进程的内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”叫做线程。

线程通常叫做轻型的进程。线程是共享内存空间的并发执行的多任务,每一个线程都共享一个进程的资源。

线程是最小的执行单元,而进程由至少一个线程组成。如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间。

注意进程与线程的区分,进程之间是CPU资源分配的最小单位且进程与进程之间的资源是不共享的

二、线程的使用

  • Python中的使用
    一般我们通过Python的_thread模块和threading模块来实现多线程

_thread模块是比较低级的模块,更接近底层。threading模块是高级模块,对_thread模块进行了封装

from threading import Thread, current_thread
import time

def run(num):
    print('子线程(%s)开始' % (current_thread().name,))
    time.sleep(2)
    print('打印', num)
    time.sleep(2)
    print('子线程(%s)结束' % (current_thread().name,))


if __name__ == '__main__':
  
    # current_thread():返回返回当前线程的实例
    print('主线程(%s)开始' % (current_thread().name,))
    # 创建子线程
    t = Thread(target=run, args=(1,), name='runThread')
    t.start()
    # 等待线程结束
    t.join()
    print("主线程(%s)结束" % (current_thread().name))

任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的子线程

  • 常用的方法
方法名 说明
isAlive() 返回线程是否在运行。正在运行指启动后、终止前。
get/setName(name) 获取/设置线程名。
start() 线程准备就绪,等待CPU调度
is/setDaemon(bool) 获取/设置是守护线程(默认前台线程(False))。(在start之前设置)
join([timeout]) 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数)
  • 线程之间的数据共享
    多线程之间的各个线程资源是共享的
from threading import Thread
from time import sleep


# 全局数据
num = 100


def run():
    print('子线程开始')
    global  num
    num += 1
    print('子线程结束')


if __name__ == '__main__':
    print('主线程开始')
    # 创建主线程
    t = Thread(target=run)
    t.start()
    t.join()
    print(num)
    print('主线程结束')

# 输出结果
  主线程开始
  子线程开始
  子线程结束
  101
  主线程结束

子线程修改了全局变量num以后,主线程输出的num是已经被修改过的num。注意,虽然我只声明了一个线程,但是实际有两个线程,即它的主线程和我开启的子线程。

  • 线程锁
    既然多线程之间资源是共享的,那当我们的线程都修改某一个变量或者文件时会发生什么情况?
from threading import Thread


# 全局变量
num = 100

def run(n):
    global num
    for i in range(100000000):
        num = num + n
        num = num - n


if __name__ == '__main__':
    t1 = Thread(target=run, args=(6,))
    t2 = Thread(target=run, args=(9,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('num=',num)

运行之后发现num不等于100了。这就是因为两个线程都对num进行操作,中间发生了紊乱。

因为线程的开启没有先后顺序而言,当有个线程+n后还没来得及-n时,另外一个线程开启又对num进行了+n造成num的值发生了改变,后续的结果都会产生变化。如果你运行后num的值没有发生改变,请多增加线程或是循环的次数。

所以这个时候我们就需要应用到了线程锁来保证同时只能有一个线程来读取我们的数据,但锁确保了这段代码只能由一个线程从头到尾的完整执行。阻止了多线程的并发执行,包含锁的某段代码实际上只能以单线程模式执行,所以效率大大滴降低了。

由于可以存在多个锁,不同线程持有不同的锁,并试图获取其他的锁,可能造成死锁,导致多个线程挂起。只能靠操作系统强制终止。

接下来,我们给之前的代码加上锁 来解决数据混乱的问题。

from threading import Thread, Lock

# 全局变量
num = 100
# 锁对象
lock = Lock()

def run(n):
    global num
    global lock
    for i in range(100000000):
        # 获取锁
        lock.acquire()
        try:
            num = num + n
            num = num - n
        finally:
            # 修改完一定 要释放锁
            lock.release()


if __name__ == '__main__':
    t1 = Thread(target=run, args=(6,))
    t2 = Thread(target=run, args=(9,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('num=',num)

注意:每次获取锁以后一定要释放锁

每次使用又要获取锁又要释放锁是不是很麻烦?

我们可以和操作文件一样,使用上下文管理器 with,可以自动获取锁,释放锁。可以将上面代码的try语句改成如下:

with lock:
    num = num + n
    num = num - n

三、线程的进阶

  • 实现进程的局部变量
    出了上面介绍的进程锁以外,我们还能通过ThreadLocal实现线程数据不共享。即线程的局部变量。
import threading


num = 0

# 创建一个全局的ThreadLocal对象
# 每个线程有独立的存储空间
# 每个线程对ThreadLocal对象都可以读写, 但是互不影响
local = threading.local()

def run(x, n):
    x = x + n
    x = x - n

def func(n):
    # 每个线程都有local.x ,就是线程的局部变量
    local.x = num
    for i in range(1000000):
        run(local.x, n)
    print('%s--%d' % (threading.current_thread().name, local.x))


if __name__ == '__main__':
    t1 = threading.Thread(target=func, args=(6,))
    t2 = threading.Thread(target=func, args=(9,))

    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("num=",num)

我们能从结果看到。num最后结果是0,那是因为num被设置成了线程的局部变量,每个线程操作的都是自己的局部变量,所以不会和其他线程的数据发生混乱。

  • 使用信号量来限制线程并发数

Semphore叫做信号量,可以限制线程的并发数,是一种带计数的线程同步机制,当调用release时,增加计算,当acquire时,减少计数,当计数为0时,自动阻塞,等待release被调用。

在大部分情况下,信号量用于守护有限容量的资源。

import threading
import time

# 创建一个信号量实例,限制并行的线程数为3个
sem = threading.Semaphore(3)


def run(i):
    # 获取信号量,信号量减1
    sem.acquire()
    print('%s--%d' % (threading.current_thread().name, i))
    time.sleep(2)
    # 释放信号量,信号量加1
    sem.release()


if __name__ == '__main__':
    # 创建5个线程
    for i in range(5):
        threading.Thread(target=run, args=(i,)).start()

我们能看到前三个线程会先执行,前三个线程执行完了之后才会执行后2个线程。

还有另一种信号量叫做BoundedSemaphore,比普通的Semphore更加严谨,叫做有界信号量。有界信号量会确保它当前的值不超过它的初始值。如果超过,则引发ValueError。

  • Barrier对象

Barrier翻译成栅栏,可以理解为线程数量不够时,会被拦住不让执行。我们来看一个例子:

import threading
import time


bar = threading.Barrier(3)


def run(i):
    print('%s--%d--start' % (threading.current_thread().name, i))
    time.sleep(2)
    bar.wait()
    print('%s--%d--end' % (threading.current_thread().name, i))

if __name__ == '__main__':
    # 创建5个线程
    for i in range(5):
        threading.Thread(target=run, args=(i,)).start()

#输出结果:
Thread-1--0--start
Thread-2--1--start
Thread-3--2--start
Thread-4--3--start
Thread-5--4--start
Thread-4--3--end
Thread-2--1--end
Thread-1--0--end

我们能看到执行完3个线程之后,程序一直停着。那是因为后面的线程不够3个,被栅栏拦住了没法继续执行。如果我们开启六个线程,那么程序就会全部执行。

  • 定时执行线程

threading中的Timer可以让线程在指定时间之后执行 ,实现定时执行的效果。

import threading
import time


def run():
    print('start...')
    time.sleep(2)
    print('end...')


if __name__ == '__main__':
    timer = threading.Timer(5, run)
    timer.start()
    print('end main...')
  • 最简单的线程通信机制——Event

Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志(flag),当调用set()时设为True,调用clear()时重置为 False。wait()将阻塞线程至等待阻塞状态。

来看一个简单的例子:

import threading
import time


# 创建一个事件
event = threading.Event()

def run():
    print('start...')
    # 进入到等待事件的阻塞状态中
    event.wait()
    time.sleep(2)
    print('end...')


if __name__ == '__main__':
    t1 = threading.Thread(target=run)
    t1.start()

    time.sleep(2)
    # 发送事件
    print('sending event...')
    event.set()

注意:clear会把内置的标志重新设为False。

  • 生产者消费者模型

生产者消费者模型是多线程中常见的一种模型。先来看一个简单的例子。

import threading, queue, time, random


# 生产者
def product(id, q):
    while True:
        num = random.randint(0, 10000)
        q.put(num)
        print('生产者%d生成了%d数据放入了队列' % (id, num))
        time.sleep(1)
    # 告诉队列任务完成
    q.task_done()


# 消费者
def consumer(id, q):
    while True:
        item = q.get()
        if item is None:
            break
        print('消费者%d消费了%d数据' % (id, item))
        time.sleep(1)
    # 任务完成
    q.task_done()


if __name__ == '__main__':

    # 消息队列
    q = queue.Queue()

    # 启动生产者
    for i in range(4):
        threading.Thread(target=product, args=(i, q)).start()

    # 启动消费者
    for i in range(3):
        threading.Thread(target=consumer, args=(i, q)).start()

生产者生成的数据存放在队列q中,消费者去队列中取数据。

  • 线程调度

正常情况下线程的执行顺序是操作系统控制的,如果需要让我们自己来控制线程的执行顺序,需要用到Condition(条件变量)类。

尝试实现这么一个需求,有一个线程输出 0 2 4 6 8 ,另一个线程输出 1 3 5 7 9 ,这两个线程并行执行时,显示的结果是混乱无序的,要求输出结果为0 1 2 3 4 5 6 7 8 9 。

import threading, time

# 线程条件变量
cond = threading.Condition()

def run1():
    for i in range(0, 10, 2):
        # condition自带一把锁
        # 获取锁
        if cond.acquire():
            print(threading.current_thread().name, i)
            time.sleep(1)
            # 当前线程执行完成,等待另一个线程执行,并释放锁
            cond.wait()
            # 通知另一个线程可以运行了。
            cond.notify()
            
            
    


def run2():
    for i in range(1, 10, 2):
        # 获取锁
        if cond.acquire():
            print(threading.current_thread().name, i)
            time.sleep(1)
            # 通知上面的线程你不要等了,该走了。
            cond.notify()
            # 然后等待上一个线程给自己继续执行的信号。
            cond.wait()



if __name__ == '__main__':
    threading.Thread(target=run1).start()
    threading.Thread(target=run2).start()

Condition类的方法说明:

acquire([timeout])/release(): 调用关联的锁的相应方法。

wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。

notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

condition同样可以使用with上下文管理器来自动管理锁。

四、多线程的应用

socket编程中多线程的应用。单线程中,socket服务器在监听时,处于阻塞状态,只能同时处理一个客户端的连接,使用多线程,让服务器可以同时处理多个客户端连接。

服务器端代码:

import socket
import threading

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('10.36.137.19',8081))
server.listen(5)

def run(ck):
  data = ck.recv(1024)
  print("客户端说:" + data.decode("utf-8"))
  ck.send("nice to meet you too".encode("utf-8"))

print("服务器启动成功,等待客户端的链接")
while True:
  clientSocket, clientAddress = server.accept()
  t = threading.Thread(target=run, args=(clientSocket,))
  t.start()

客户端代码:

import socket


client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(("10.36.137.19", 8081))


while True:
    data = input("请输入给服务器发送的数据")
    client.send(data.encode("utf-8"))
    info = client.recv(1024)
    print("服务器说:", info.decode("utf-8"))
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容