# -*- coding:utf-8 -*-
# auther="金木水火"
'''多线程内置模块threading'''
import threading
import time
'''参数用法:group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None
group:一般不使用
target:任务名称,一般传入方法/函数 m名
name:线程名称
args:调用方法的入参tuple
daemon:用来设置线程是否随主线程退出而退出
'''
class DemoThread():
# 1.非继承实现多线程
def task1(self):
for _ in range(10):
print('我是任务1')
time.sleep(0.5)
def task2(self):
for _ in range(10):
print('我是任务2')
time.sleep(1)
def run(self):
th1=threading.Thread(target=self.task1)
th2 = threading.Thread(target=self.task2)
th1.start()
th2.start()
print('主线程执行完毕')
class DemoThread(threading.Thread):
# 2.继承实现多线程
def __init__(self,name):
super().__init__()#初始化父类
self.name=name
def run(self):
time.sleep(2)
print('我是线程{0}'.format(self.name))
##join的作用阻塞执行直到线程执行完毕
def run():
''' 有join结果:可能是
我是线程Thread-2我是线程Thread-4
我是线程Thread-3我是线程Thread-1
主线程结束
'''
threads = []
for i in range(1, 5):
threads.append(DemoThread(name='Thread-' + str(i)))
for t in threads:
t.start()
for t in threads:
t.join()
print('主线程结束')
def run():
''' 无join结果:可能是
主线程结束
我是线程Thread-2
我是线程Thread-1
我是线程Thread-4我是线程Thread-3
'''
threads = []
for i in range(1, 5):
threads.append(DemoThread(name='Thread-' + str(i)))
for t in threads:
t.start()
print('主线程结束')
##daemon 守护线程
#无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁
#需要强调的是:运行完毕并非终止运行
class DemoThread(threading.Thread):
# 2.继承实现多线程
def __init__(self,name,Daemon=False):
super().__init__()#初始化父类
self.name=name
self.setDaemon(Daemon)
def run(self):
time.sleep(2)
print('我是线程{0}'.format(self.name))
def run():
''' Daemon=True结果:可能是
主线程结束
True
True
True
True
'''
threads = []
for i in range(1, 5):
threads.append(DemoThread(name='Thread-' + str(i),Daemon=True))
for t in threads:
t.start()
print('主线程结束')
for t in threads:
print(t.is_alive())
#run()方法,不启动线程执行方法
def run():
''' 无join结果:可能是
主线程结束
我是线程Thread-2
我是线程Thread-1
我是线程Thread-4我是线程Thread-3
'''
threads = []
for i in range(1, 5):
threads.append(DemoThread(name='Thread-' + str(i)))
for t in threads:
t.run()
print('主线程结束')
'''
#多线程锁 lock
#lock只是锁住了部分逻辑,join是锁住了整个线程
#PIL锁和lock
#1.100个线程去抢GIL锁,即抢执行权限
#2. 肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()
#3. 极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,
但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL
#4.直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程
'''
class DemoThread(threading.Thread):
def __init__(self,name,Daemon=False):
super().__init__()#初始化父类
self.name=name
self.lock=threading.Lock()
def run(self):
print('{0}等到获取锁'.format(self.name))
self.lock.acquire()
print('{0}释放锁'.format(self.name))
self.lock.release()
print("{0}当前锁的状态:{1}".format(self.name, self.lock.locked()))
def run():
'''
同时只能有一个线程获取到锁
'''
threads = []
for i in range(1, 5):
threads.append(DemoThread(name='Thread-' + str(i)))
for t in threads:
t.start()
print('主线程结束')
'''
线程死锁:多个线程之间互相等待
'''
class DemoThread(threading.Thread):
def __init__(self,name,Daemon=False):
super().__init__()#初始化父类
self.name = name
self.lock1 = threading.Lock()
self.lock2 = threading.Lock()
def task1(self):
print('{0}等到获取锁1'.format(self.name))
self.lock1.acquire()
print('{0}等到获取锁2'.format(self.name))
self.lock2.acquire()
print('{0}释放锁1'.format(self.name))
self.lock1.release()
print('{0}释放锁2'.format(self.name))
self.lock1.release()
def task2(self):
print('{0}等到获取锁2'.format(self.name))
self.lock2.acquire()
print('{0}等到获取锁1'.format(self.name))
self.lock1.acquire()
print('{0}释放锁2'.format(self.name))
self.lock2.release()
print('{0}释放锁1'.format(self.name))
self.lock1.release()
def run(self):
self.task1()
print("进入死锁")
#self.task2()
def run():
'''
RuntimeError: release unlocked lock
'''
threads = []
for i in range(1, 5):
threads.append(DemoThread(name='Thread-' + str(i)))
for t in threads:
t.start()
print('主线程结束')
'''
递归锁:多个线程之间RLock可以重复进入
递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,
从而使得资源可以被多次require。直到一个线程所有的acquire都被release,
其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
'''
class DemoThread(threading.Thread):
def __init__(self,name,Daemon=False):
super().__init__()#初始化父类
self.name = name
self.lock1 = threading.RLock()
self.lock2 = threading.RLock()
def task1(self):
print('{0}等到获取锁1'.format(self.name))
self.lock1.acquire()
print('{0}等到获取锁2'.format(self.name))
self.lock2.acquire()
print('{0}释放锁1'.format(self.name))
self.lock1.release()
print('{0}释放锁2'.format(self.name))
self.lock1.release()
def task2(self):
print('{0}等到获取锁2'.format(self.name))
self.lock2.acquire()
print('{0}等到获取锁1'.format(self.name))
self.lock1.acquire()
print('{0}释放锁2'.format(self.name))
self.lock2.release()
print('{0}释放锁1'.format(self.name))
self.lock1.release()
def run(self):
self.task1()
self.task2()
def run():
'''
RuntimeError: release unlocked lock
'''
threads = []
for i in range(1, 5):
threads.append(DemoThread(name='Thread-' + str(i)))
for t in threads:
t.start()
print('主线程结束')
'''
并发锁控制:Semaphore
Semaphore是用来控制对共享资源的访问量,
可以控制同一时刻进程的并发数量。
'''
se = threading.Semaphore(2)
class DemoThread(threading.Thread):
def __init__(self,name,Daemon=False):
super().__init__()#初始化父类
self.name = name
def run(self):
se.acquire()
print('{0}我执行完毕了,释放锁{1}'.format(self.name,time.ctime()))
time.sleep(6)
se.release()
def run():
threads = []
for i in range(1, 10):
threads.append(DemoThread(name='Thread-' + str(i)))
for t in threads:
t.start()
#print('主线程结束')
'''
#条件锁 Condition
Condition.acquire() 加锁
Condition.wait() 阻塞线程
Condition.notify() 通知本线程执行完毕
Condition.release() 释放锁
'''
co=threading.Condition()
class stack():
def __init__(self,stlen=6):
super().__init__()
self.__st=list()
self.__stlen=stlen
def put(self,value,th):
global co
print("当前进程:{},入栈数据:{}".format(th, value))
co.acquire() #获取锁
if self.__stlen == len(self.__st):
print('当前进程:{}栈已经满了,等待消费'.format(th))
co.wait()
self.__st.append(value)
co.notify()
co.release()
def out(self,th):
global co,L
co.acquire() # 获取锁
if 0 == len(self.__st):
print('当前进程:{} 栈已经取空,等待入栈'.format(th))
co.wait()
if len(self.__st) >0:
value=self.__st.pop()
print("当前进程:{},出栈数据:{}".format(th, value))
co.notify()
co.release()
def run():
st=stack()
for i in range(1,1001):
th1=threading.Thread(target=st.put,args=(i,'th_in{}'.format(i)),name='th_in{}'.format(i))
th2 = threading.Thread(target=st.out, args=('th_out{}'.format(i),), name='th_out{}'.format(i),daemon=True)
if i % 2 == 0:
th2.start()
else:
th1.start()
print('主进程执行完毕')
#事件锁 Event
co=threading.Event()
class stack():
def __init__(self,stlen=6):
super().__init__()
self.__st=list()
self.__stlen=stlen
def put(self,value,th):
global co
print("当前进程:{},入栈数据:{}".format(th, value))
if self.__stlen == len(self.__st):
print('当前进程:{}栈已经满了,等待消费,事件锁状态{}'.format(th,co.is_set()))
co.wait()
self.__st.append(value)
co.set()
def out(self,th):
global co
if 0 == len(self.__st):
print('当前进程:{} 栈已经取空,等待入栈'.format(th))
co.wait()
if len(self.__st) >0:
value=self.__st.pop()
print("当前进程:{},出栈数据:{}".format(th, value))
co.set()
def run():
st=stack()
for i in range(1,1001):
th1=threading.Thread(target=st.put,args=(i,'th_in{}'.format(i)),name='th_in{}'.format(i))
th2 = threading.Thread(target=st.out, args=('th_out{}'.format(i),), name='th_out{}'.format(i),daemon=True)
if i % 2 == 0:
th2.start()
else:
th1.start()
'''
最后来总结一下,threading 模块中的5种锁
① 互斥锁:Lock,一次只能放行一个,可以通过 with 语句调用。
② 可重入锁:RLock,一次只能放行一个,可以通过 with 语句调用。
③ 条件锁:Condition,一次可以放行任意个,可以通过 with 语句调用。
④ 事件锁:Event,一次全部放行,不能通过 with 语句调用。
⑤ 信号量锁:semaphore,一次可以放行特定个,可以通过 with 语句调用。
'''
from concurrent.futures import ThreadPoolExecutor
def async_add(max):
#进程池
sum = 0
for i in range(max):
sum += i
time.sleep(1)
print(f'{threading.current_thread().name} execute finished, result is {sum}')
return sum
def run():
pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix='test_thread')
for i in range(1, 15):
pool.submit(async_add, i)
if __name__ == '__main__':
run()
python-多线程
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...