线程与进程
- 每一个应用程序都有一个自己的进程,操作系统会为这些进程分配一些执行资源,例如内存空间等。
- 在一个进程内部,要同时干多件事,同时运行多个“子任务”,称为线程。
- 执行多任务:多进程、多线程、多进程+多线程
多线程编程
线程状态有:
- New 创建
- Runnable 就绪
- Running 运行
- Blocked 阻塞(wait locked sleeping)
- Dead 消亡
阻塞的三种情况
- 同步阻塞(锁定池):处于竞争锁定的状态,线程请求锁定时将进入这个状态,一旦成功获得锁定又恢复到运行状态。
- 等待阻塞(等待池):等待其他线程通知的状态,线程获得条件锁定后,调用“等待”将进入这个状态,一旦也其他线程发出通知,线程将进入同步阻塞状态,再次竞争条件锁定。
- 其他阻塞:指调用time.sleep(),join()时的阻塞,这个状态下线程不会释放已获得的锁定。
当一个进程启动之后,会默认产生一个主线程,因为线程是程序执行流的最小单元,当设置多线程时,主线程会创建多个子线程,在python中,默认情况下,主线程执行完自己的任务以后,就退出了,此时子线程会继续执行自己的任务,直到自己的任务结束,
import time
import threading
class MyThread(threading.Thread):
def run(self):
for i in range(5):
print('thread{},@number:{}'.format(self.name,i))
time.sleep(1)
def main():
print("start main threading")
# 创建三个线程
threads = [MyThread() for i in range(3)]
# 启动三个线程
for t in threads:
t.start()
print('End main threading')
if __name__ == '__main__':
main()
start main threading
threadThread-18,@number:0
threadThread-19,@number:0
threadThread-20,@number:0
End main threading
threadThread-18,@number:1
threadThread-19,@number:1
threadThread-20,@number:1
threadThread-18,@number:2
threadThread-19,@number:2
threadThread-20,@number:2
threadThread-18,@number:3
threadThread-19,@number:3
threadThread-20,@number:3
threadThread-18,@number:4
threadThread-19,@number:4
threadThread-20,@number:4
主线程结束后,子线程还在运行。如果需要主线程要等待子线程运行完后再退出,需要用到 join 方法
join 所完成的工作就是线程同步,即主线程任务结束之后,进入阻塞状态,一直等待其他的子线程执行结束之后,主线程在终止。
import time
import threading
class MyThread(threading.Thread):
def run(self):
for i in range(5):
print('thread{},@number:{}'.format(self.name,i))
time.sleep(1)
def main():
print("start main threading")
# 创建三个线程
threads = [MyThread() for i in range(3)]
# 启动三个线程
for t in threads:
t.start()
for t in threads:
t.join()
print('End main threading')
if __name__ == '__main__':
main()
start main threading
threadThread-21,@number:0
threadThread-22,@number:0
threadThread-23,@number:0
threadThread-21,@number:1
threadThread-22,@number:1
threadThread-23,@number:1
threadThread-21,@number:2
threadThread-22,@number:2
threadThread-23,@number:2
threadThread-21,@number:3
threadThread-22,@number:3
threadThread-23,@number:3
threadThread-21,@number:4
threadThread-22,@number:4
threadThread-23,@number:4
End main threading
主线程在等待子线程运行结束后才结束的
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("开始线程:" + self.name)
print_time(self.name, self.counter, 5)
print ("退出线程:" + self.name)
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
threadName.exit()
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 开启新线程
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print ("退出主线程")
开始线程:Thread-1
开始线程:Thread-2
Thread-1: Fri Nov 6 17:17:11 2020
Thread-1: Fri Nov 6 17:17:12 2020
Thread-2: Fri Nov 6 17:17:12 2020
Thread-1: Fri Nov 6 17:17:13 2020
Thread-1: Fri Nov 6 17:17:14 2020
Thread-2: Fri Nov 6 17:17:14 2020
Thread-1: Fri Nov 6 17:17:15 2020
退出线程:Thread-1
Thread-2: Fri Nov 6 17:17:16 2020
Thread-2: Fri Nov 6 17:17:18 2020
Thread-2: Fri Nov 6 17:17:20 2020
退出线程:Thread-2
退出主线程
使用线程加载获取数据,通常都会造成数据不同步的情况。这时候我们可以给资源进行加锁,实现多个线程同步。
- lock = threading.Lock()
在线程中获取锁
- lock.acquire()
释放锁
- lock.release()
import threading
import time
threadLock = threading.Lock()
threads = []
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("开启线程: " + self.name)
# 获取锁,用于线程同步(进入同步阻塞状态)
threadLock.acquire()
print_time(self.name, self.counter, 5)
# 释放锁,开启下一个线程
threadLock.release()
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 开启新线程
thread1.start()
thread2.start()
# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)
# 等待所有线程完成
for t in threads:
t.join()
print ("退出主线程")
开启线程: Thread-1
开启线程: Thread-2
Thread-1: Fri Nov 6 19:54:44 2020
Thread-1: Fri Nov 6 19:54:45 2020
Thread-1: Fri Nov 6 19:54:46 2020
Thread-1: Fri Nov 6 19:54:47 2020
Thread-1: Fri Nov 6 19:54:48 2020
Thread-2: Fri Nov 6 19:54:50 2020
Thread-2: Fri Nov 6 19:54:52 2020
Thread-2: Fri Nov 6 19:54:54 2020
Thread-2: Fri Nov 6 19:54:56 2020
Thread-2: Fri Nov 6 19:54:58 2020
退出主线程
Condition条件变量
- 可以在某些事件触发或者达到特定的条件后才处理数据。通常与一个锁关联。
- 除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()通知;得到通知后线程进入锁定池等待锁定。
con = threading.Condition()
con.wait() 使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已经获得锁定。
con.notify() 从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定。使用前线程必须已获得锁定,不会释放锁定
con.notifyAll() 将通知等待池中所有的线程进入锁定池尝试获得锁定。(同上)
import threading
import time
# 商品
product = None
# 条件变量
con = threading.Condition()
# 生产者方法
def produce():
global product
if con.acquire():
while True:
if product is None:
print ('produce...')
product = 'anything'
# 通知消费者,商品已经生产
con.notify()
# 等待通知
con.wait()
time.sleep(2)
# 消费者方法
def consume():
global product
if con.acquire():
while True:
if product is not None:
print ('consume...')
product = None
# 通知生产者,商品已经没了
con.notify()
# 等待通知
con.wait()
time.sleep(2)
t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
t2.start()
t1.start()
produce...
consume...
produce...
consume...
线程间通信
从一个线程向另一个线程发送数据最安全的方法是使用queue库中的队列。创建一个被多个线程共享的Queue对象,这些线程通过使用put()和get()操作来向队列中添加或者删除元素。
后台线程
一般情况下,主线程结束后子线程也依然会继续执行。加入setDaemon(True)方法,设置子线程为守护线程,主线程一旦执行结束,全部线程全部被终止执行,子线程退出不再执行。setDaemon()在start()之前。
多进程编程
1.类Process
Process([group[,target[,name[,args[,kwargs]]]]])
- target 表示调用对象
- args 表示调用对象的位置参数元组
- kwargs 表示调用对象的字典(关键字)
- group 实质上不使用
import multiprocessing
import time
def worker(interval,name):
print(name + '[start]')
time.sleep(interval)
print(name + '[end]')
if __name__ == "__main__":
p1 = multiprocessing.Process(target=worker,args=(2,'kobe'))
p2 = multiprocessing.Process(target=worker,args=(3,'james'))
p3 = multiprocessing.Process(target=worker,args=(4,'irving'))
p1.start()
p2.start()
p3.start()
print("the number of CPU is:" + str(multiprocessing.cpu_count()))
for p in multiprocessing.active_children():
print("chlid p.name:" + p.name + "\tp.id" + str(p.pid))
print("end!")
the number of CPU is:8
chlid p.name:Process-11 p.id14396
end!
chlid p.name:Process-12 p.id6740
end!
chlid p.name:Process-13 p.id2844
end!
????
2.把进程创建成类
import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
def __init__(self,interval):
multiprocessing.Process.__init__(self)
self.interval = interval
def run(self):
n = 5
while n > 0:
print("now time:{0}".format(time.ctime())
time.sleep(self.interval)
n -= 1
if __name__ =='__main__':
p = ClockProcess(3)
p.start() # 进程p调用start()时,自动调用run()方法
# 应该会显示五次实时时间 不知道为什么在Jupyter上没有显示 在Pycharm上运行良好
# ********打印结果********
# now time:Fri Nov 6 19:59:03 2020
# now time:Fri Nov 6 19:59:06 2020
# now time:Fri Nov 6 19:59:09 2020
# now time:Fri Nov 6 19:59:12 2020
# now time:Fri Nov 6 19:59:15 2020
3.daemon属性
与线程相同,在子进程中加入了daemon属性后,当主进程结束时,子进程也会跟着结束。
import multiprocessing
import time
def worker(interval):
print('start time:{0}'.format(time.ctime()))
time.sleep(interval)
print('end time:{0}'.format(time.ctime()))
if __name__ == '__main__':
p = multiprocessing.Process(target=worker,args=(3,))
p.start()
print('ok!')
# ********打印结果********
# ok!
# start time:Sat Nov 7 08:51:48 2020
# end time:Sat Nov 7 08:51:51 2020
# 个人感觉Jupyter对实时时间显示或者就是对多进程的实现有点问题....
import multiprocessing
import time
def worker(interval):
print('start time:{0}'.format(time.ctime()))
time.sleep(interval)
print('end time:{0}'.format(time.ctime()))
if __name__ == '__main__':
p = multiprocessing.Process(target=worker,args=(3,))
p.daemon = True # 在start()前加入daemon属性,最终结果当主进程结束的时候,子进程也会跟着结束。
p.start()
print('ok!')
ok!
4.join方法
和多线程的join方法类似,阻塞当前进程,直到调用join方法的那个进程执行完,再继续当前进程。
- 如果有daemon和join,也会发生阻塞。
5.Pool
我们可以用进程池的方法批量创建子进程
from multiprocessing import Pool
import os,time,random
def long_time_task(name):
print('进程的名字:{0};进程的PID:{1}'.format(name,os.getpid())) # os库提供通用的、基本的操作系统交互功能
start = time.time()
time.sleep(random.random()*3)
end = time.time()
print('进程{0}运行了{1}秒'.format(name,(end-start)))
if __name__ == '__main__':
print('主进程的PID:{0}'.format(os.getpid()))
p = Pool(4)
for i in range(6):
p.apply_async(long_time_task,args=(i,)) # 非阻塞
p.close() # 在join之前先调用close函数,执行完close后不会有新的进程加入到pool
p.join()
print('end')
# 主进程的PID:11908
# 进程的名字:0;进程的PID:12380
# 进程的名字:1;进程的PID:7968
# 进程的名字:2;进程的PID:2264
# 进程的名字:3;进程的PID:12860
# 进程1运行了0.6849398612976074秒
# 进程的名字:4;进程的PID:7968
# 进程0运行了1.4097185134887695秒
# 进程的名字:5;进程的PID:12380
# 进程3运行了1.5197131633758545秒
# 进程2运行了1.7282814979553223秒
# 进程4运行了1.6934387683868408秒
# 进程5运行了2.608299970626831秒
# end
主进程的PID:15428
(经查资料,Jupyter确实无法打印子进程的运行,好像有一些解决办法,就先不深入研究了....)
- apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞。
- apply(func[, args[, kwds]])是阻塞的。
- close() 关闭pool,使其不在接受新的任务。
- 创建一个进程池Pool,进程数为4,range(6)会产生0-5个对象,0-3四个对象先送到进程中执行,当其中一个执行结束后,空出一个进程处理对象4。
- 因为是非阻塞,主函数自己执行,然后在join()处等待各个进程的结束。
from multiprocessing import Pool
import os,time,random
def long_time_task(name):
print('进程的名字:{0};进程的PID:{1}'.format(name,os.getpid())) # os库提供通用的、基本的操作系统交互功能
start = time.time()
time.sleep(random.random()*3)
end = time.time()
print('进程{0}运行了{1}秒'.format(name,(end-start)))
if __name__ == '__main__':
print('主进程的PID:{0}'.format(os.getpid()))
p = Pool(4)
for i in range(6):
p.apply(long_time_task,args=(i,)) # 阻塞
p.close() # 在join之前先调用close函数,执行完close后不会有新的进程加入到pool
p.join()
print('end')
# 主进程的PID:4916
# 进程的名字:0;进程的PID:11052
# 进程0运行了1.269831657409668秒
# 进程的名字:1;进程的PID:14960
# 进程1运行了0.7151107788085938秒
# 进程的名字:2;进程的PID:3992
# 进程2运行了1.8021326065063477秒
# 进程的名字:3;进程的PID:7004
# 进程3运行了2.427323818206787秒
# 进程的名字:4;进程的PID:11052
# 进程4运行了0.7083690166473389秒
# 进程的名字:5;进程的PID:14960
# 进程5运行了2.8224308490753174秒
# end
6.进程间通信
Process之间需要通信。multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。
from multiprocessing import Process, Queue
import os, time, random
def write(q):
# 写数据进程
print('写进程的PID:{0}'.format(os.getpid()))
for value in['kobe','james','irving']:
print('写进Queue的值为:{0}'.format(value))
q.put(value)
time.sleep(random.random())
def read(q):
# 读数据进程
print('读进程的PID:{0}'.format(os.getpid()))
while True:
value = q.get(True)
print('从Queue读取的值为:{0}'.format(value))
if __name__ == '__main__':
# 父进程创建Queue,并传给各个子进程
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程 pw
pw.start()
# 启动子进程 pr
pr.start()
# 等待pw结束
pw.join()
# pr 进程里是死循环,无法等待其结束,只能强行终止
pr.terminate() # 结束工作进程,不在处理未完成的任务。
# 读进程的PID:3180
# 写进程的PID:6048
# 写进Queue的值为:kobe
# 从Queue读取的值为:kobe
# 写进Queue的值为:james
# 从Queue读取的值为:james
# 写进Queue的值为:irving
# 从Queue读取的值为:irving