多任务原理
现代操作系统(Windows、Mac OS X、Linux、UNIX等)都支持“多任务”
什么叫多任务???
操作系统同时可以运行多个任务
单核CPU实现多任务原理:操作系统轮流让各个任务交替执行,QQ执行2us,切换到微信,在执行2us,再切换到陌陌,执行2us……。表面是看,每个任务反复执行下去,但是CPU调度执行速度太快了,导致我们感觉就行所有任务都在同时执行一样
多核CPU实现多任务原理:真正的秉性执行多任务只能在多核CPU上实现,但是由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行
并发:看上去一起执行,任务数多于CPU核心数
并行:真正一起执行,任务数小于等于CPU核心数
实现多任务的方式:
1、多进程模式
2、多线程模式
3、协程模式
4、多进程+多线程模式
进程概念
对于操作系统而言,一个任务就是一个进程
进程是系统中程序执行和资源分配的基本单位。每个进程都有自己的数据段、代码段、和堆栈段
多进程: 多进程不能共享资源,多个进程相互独立,多个进程受控于操作系统.
多线程: 多线程可以共享资源,线程受控于进程.
多个线程在同一个应用程序中.
什么是进程?
进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
进程的生命周期
单任务现象
from time import sleep
def run():
while True:
print("sunck is a nice man")
sleep(1.2)
if __name__ == "__main__":
while True:
print("he is a good man")
sleep(1)
# 不会执行到run方法,只有上面的while循环结束才可以执行
run()
父进程和子进程
Linux 操作系统提供了一个 fork() 函数用来创建子进程,这个函数很特殊,调用一次,返回两次,因为操作系统是将当前的进程(父进程)复制了一份(子进程),然后分别在父进程和子进程内返回。子进程永远返回0,而父进程返回子进程的 PID。我们可以通过判断返回值是不是 0 来判断当前是在父进程还是子进程中执行。
在 Python 中同样提供了 fork() 函数,此函数位于 os 模块下。
import os
import time
print("在创建子进程前: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
pid = os.fork()
if pid == 0:
print("子进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
time.sleep(5)
else:
print("父进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
# pid表示回收的子进程的pid
#pid, result = os.wait() # 回收子进程资源 阻塞
time.sleep(5)
#print("父进程:回收的子进程pid=%d" % pid)
#print("父进程:子进程退出时 result=%d" % result)
# 下面的内容会被打印两次,一次是在父进程中,一次是在子进程中。
# 父进程中拿到的返回值是创建的子进程的pid,大于0
print("fork创建完后: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
父子进程如何区分?
子进程是父进程通过fork()产生出来的,pid = os.fork()
通过返回值pid是否为0,判断是否为子进程,如果是0,则表示是子进程
由于 fork() 是 Linux 上的概念,所以如果要跨平台,最好还是使用 subprocess 模块来创建子进程。
子进程如何回收?
python中采用os.wait()方法用来回收子进程占用的资源
pid, result = os.wait() # 回收子进程资源 阻塞,等待子进程执行完成回收
在UNIX 系统中,一个进程结束了,但是他的父进程没有等待(调用wait / waitpid)他, 那么他将变成一个僵尸进程。 但是如果该进程的父进程已经先结束了,那么该进程就不会变成僵尸进程, 因为每个进程结束的时候,系统都会扫描当前系统中所运行的所有进程, 看有没有哪个进程是刚刚结束的这个进程的子进程,如果是的话,就由Init 来接管他,成为他的父进程……
fork()
缺点:
1.兼容性差,只能在类linux系统下使用,windows系统不可使用;
2.扩展性差,当需要多条进程的时候,进程管理变得很复杂;
3.会产生“孤儿”进程和“僵尸”进程,需要手动回收资源。
优点:
是系统自带的接近低层的创建方式,运行效率高。
进程实现多任务
multiprocessing模块提供Process类实现新建进程
特点:
1.注意:Process对象可以创建进程,但Process对象不是进程,其删除与否与系统资源是否被回收没有直接的关系。
2.主进程执行完毕后会默认等待子进程结束后回收资源,不需要手动回收资源;join()函数用来控制子进程
结束的顺序,其内部也有一个清除僵尸进程的函数,可以回收资源;
3.Process进程创建时,子进程会将主进程的Process对象完全复制一份,这样在主进程和子进程各有一个 Process对象,但是p.start()启动的是子进程,主进程中的Process对象作为一个静态对象存在,不执行。
4.当子进程执行完毕后,会产生一个僵尸进程,其会被join函数回收,或者再有一条进程开启,start函数也会回收僵尸进程,所以不一定需要写join函数。
5.windows系统在子进程结束后会立即自动清除子进程的Process对象,而linux系统子进程的Process对象如果没有join函数和start函数的话会在主进程结束后统一清除。
另外还可以通过继承Process对象来重写run方法创建进程
'''
multiprocessing 库
跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象
'''
from multiprocessing import Process
from time import sleep
import os
#子进程需要执行的买吗
def run(str):
while True:
# os.getpid()获取当前进程id号
# os.getppid()获取当前进程的父进程id号
print("he is a %s man--%s--%s"%(str, os.getpid(),os.getppid()))
sleep(1.2)
if __name__ == "__main__":
print("主(父)进程启动-%s"%(os.getpid()))
#创建子进程
#target说明进程执行的任务
p = Process(target=run, args=("nice",))
#启动进程
p.start()
while True:
print("he is a good man")
sleep(1)
父进程的先后顺序
from multiprocessing import Process
from time import sleep
import os
def run(str):
print("子进程启动")
sleep(3)
print("子进程结束")
if __name__ == "__main__":
print("父进程启动")
p = Process(target=run, args=("nice",))
p.start()
# 在不加join的时候,父进程结束后子进程称为孤儿进程,继续运行,父进程的结束不能影响子进程
#利用join,进程阻塞,让父进程等待子进程结束再执行父进程
p.join()
print("父进程结束")
全局变量在多个进程中不能共享
父进程和子进程使用不同的堆栈段
from multiprocessing import Process
from time import sleep
num = 100
def run():
print("子进程开始")
global num # num = 100
num += 1
print(num) # 101
print("子进程结束")
if __name__ == "__main__":
print("父进程开始")
p = Process(target=run)
p.start()
p.join()
# 在子进程中修改全局变量对父进程中的全局变量没有影响
# 在创建子进程时对全局变量做了一个备份,父进程中的与子进程中的num是完全不同的两个变量
print("父进程结束--%d"%num) # 100
进程池
from multiprocessing import Pool
import os, time, random
def run(name):
print("子进程%d启动--%s" % (name, os.getpid()))
start = time.time()
time.sleep(random.choice([1,2,3]))
end = time.time()
print("子进程%d结束--%s--耗时%.2f" % (name, os.getpid(), end-start))
if __name__ == "__main__":
print("父进程启动")
#创建多个进程
#进程池 Pool
#表示可以同时执行的进程数量
#Pool默认大小是CPU核心数
pp = Pool(4)
for i in range(6):
# 先同时启动4个子进程,因为是4核,有进程结束后才能执行第5,6个
#创建进程,放入进程池统一管理
pp.apply_async(run,args=(i,))
#在调用join之前必须先调用close,调用close之后就不能再继续添加新的进程了
pp.close()
#进程池对象调用join,会等待进程池中所有的子进程结束完毕再去执行父进程
pp.join()
print("父进程结束")
述代码中的pool.apply_async()
是apply()
函数的变体,apply_async()
是apply()
的并行版本,apply()
是apply_async()
的阻塞版本,使用apply()
主进程会被阻塞直到函数执行结束,所以说是阻塞版本。apply()
既是Pool
的方法,也是Python内置的函数,两者等价。可以看到输出结果并不是按照代码for循环中的顺序输出的。
多个子进程并返回值
apply_async()
本身就可以返回被进程调用的函数的返回值。上一个创建多个子进程的代码中,如果在函数func
中返回一个值,那么pool.apply_async(func, (msg, ))
的结果就是返回pool中所有进程的值的对象(注意是对象,不是值本身)。
import multiprocessing
import time
def func(msg):
return multiprocessing.current_process().name + '-' + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4) # 创建4个进程
results = []
for i in range(20):
msg = "process %d" %(i)
results.append(pool.apply_async(func, (msg, )))
pool.close() # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用
pool.join() # 等待进程池中的所有进程执行完毕
print ("Sub-process(es) done.")
for res in results:
print (res.get())
与之前的输出不同,这次的输出是有序的。
如果电脑是八核,建立8个进程,在Ubuntu下输入top命令再按下大键盘的1,可以看到每个CPU的使用率是比较平均的
拷贝文件
import os, time
from multiprocessing import Pool
#实现文件的拷贝
def copyFile(rPath, wPath):
fr = open(rPath, "rb")
fw = open(wPath, "wb")
context = fr.read()
fw.write(context)
fr.close()
fw.close()
path = r"C:\Users\xlg\Desktop\Python-1704\day20\2、进程\file"
toPath = r"C:\Users\xlg\Desktop\Python-1704\day20\2、进程\toFile"
#读取path下的都有的文件
filesList = os.listdir(path)
#启动for循环处理每一个文件
start = time.time()
for fileName in filesList:
copyFile(os.path.join(path,fileName), os.path.join(toPath,fileName))
end = time.time()
print("总耗时:%0.2f" % (end-start))
多进程实现文件的拷贝
import os, time
from multiprocessing import Pool
#实现文件的拷贝
def copyFile(rPath, wPath):
fr = open(rPath, "rb")
fw = open(wPath, "wb")
context = fr.read()
fw.write(context)
fr.close()
fw.close()
path = r"C:\Users\xlg\Desktop\Python-1704\day20\2、进程\file"
toPath = r"C:\Users\xlg\Desktop\Python-1704\day20\2、进程\toFile"
if __name__ == "__main__":
# 读取path下的都有的文件
filesList = os.listdir(path)
start = time.time()
pp = Pool(4)
for fileName in filesList:
pp.apply_async(copyFile, args=(os.path.join(path,fileName), os.path.join(toPath,fileName)))
pp.close()
pp.join()
end = time.time()
print("总耗时:%0.2f" % (end-start))
封装进程对象
# myProcess.py
from multiprocessing import Process
import os, time
class MyProcess(Process):
def __init__(self,name):
Process.__init__(self)
self.name = name
def run(self):
print("子进程(%s-%s)启动" % (self.name, os.getpid()))
#子进程的功能
time.sleep(3)
print("子进程(%s-%s)结束" % (self.name, os.getpid()))
from myProcess import MyProcess
if __name__ == "__main__":
print("父进程启动")
#创建子进程
p = myProcess("test")
# 自动调用p进程对象的run方法
p.start()
p.join()
print("父进程结束")
进程间通信
- 管道pipe(全双工,半双工):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。
- 命名管道FIFO:有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
- 消息队列MessageQueue:消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
- 共享存储SharedMemory:共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号两,配合使用,来实现进程间的同步和通信。
以上几种进程间通信方式中,消息队列是使用的比较频繁的方式。
- 单工数据传输只支持数据在一个方向上传输;
- 半双工数据传输允许数据在两个方向上传输,但是,在某一时刻,只允许数据在一个方向上传输,它实际上是一种切换方向的单工通信;
- 全双工数据通信允许数据同时在两个方向上传输,因此,全双工通信是两个单工通信方式的结合,它要求发送设备和接收设备都有独立的接收和发送能力。
(1)管道pipe
import multiprocessing
def foo(sk):
sk.send('hello father')
print(sk.recv())
if __name__ == '__main__':
conn1,conn2=multiprocessing.Pipe(duplex=True) #开辟两个口,都是能进能出,括号中如果False即单向通信, 那么由主进程发送消息,子进程接受消息
p=multiprocessing.Process(target=foo,args=(conn1,)) #子进程使用sock口,调用foo函数
p.start()
print(conn2.recv()) #主进程使用conn口接收,从管道pipe中读取消息
conn2.send('hi son') #主进程使用conn口发送
Pipe对象返回的元组分别代表管道的两端,管道默认是全双工,两端都支持send
和recv
方法,两个进程分别操作管道两端时不会有冲突,两个进程对管道一端同时读写时可能会有冲突。
如果声明了coon1,coon2 = Pipe(duplex=False)
的单向管道,则coon1
只负责接受消息,conn2
只负责发送消息。
(2)消息队列Queue
Queue是多进程的安全队列,可以使用Queue实现多进程之间的数据传递。
Queue的一些常用方法:
- Queue.qsize():返回当前队列包含的消息数量;
- Queue.empty():如果队列为空,返回True,反之False ;
- Queue.full():如果队列满了,返回True,反之False;
- Queue.get():获取队列中的一条消息,然后将其从列队中移除,可传参超时时长。
- Queue.get_nowait():相当Queue.get(False),取不到值时触发异常:Empty;
- Queue.put():将一个值添加进数列,可传参超时时长。
- Queue.put_nowait():相当于Queue.get(False),当队列满了时报错:Full。
from multiprocessing import Process, Queue
import os, time
def write(q):
print("启动写子进程%s" % (os.getpid()))
for chr in ["A", "B", "C", "D"]:
q.put(chr)
time.sleep(1)
print("结束写子进程%s" % (os.getpid()))
def read(q):
print("启动读子进程%s" % (os.getpid()))
while True:
value = q.get(True)
print("value = " + value)
print("结束读子进程%s" % (os.getpid()))
if __name__ == "__main__":
#父进程创建队列,并传递给子进程
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
#pr进程里是个死循环,无法等待其结束,只能强行结束
pr.terminate()
print("父进程结束")
(3)队列通信实现多进程爬虫
import time
from multiprocessing import Process, Queue
class Downloader(Process):
def __init__(self, task_queue, result_queue):
super().__init__()
self.name = '下载器'
self.task_queue: Queue = task_queue
self.result_queue: Queue = result_queue
def download(self, url):
time.sleep(2)
print('下载完成', url)
return {'time': time.time()}
def run(self): # 启动当前进程之后执行的核心程序
while True:
try:
# 设置10秒的超时时间,如果是10秒没有任务,则会抛出异常
task = self.task_queue.get(timeout=10)
if isinstance(task, str):
resp = self.download(task)
# 数据写入到任务结果中
self.result_queue.put(resp)
else:
self.result_queue.put('bye')
break
except:
break
class ItemPipeline(Process):
def __init__(self, result_queue):
super().__init__()
self.name = '数据处理Item'
self.result_queue = result_queue
def run(self):
while True:
data = self.result_queue.get()
if isinstance(data, dict):
print('下载完成的数据:', data)
else:
break
print('ItemPipeline 进程退出')
def start(downloads, itempipes):
for d in downloads:
d.start()
for i in itempipes:
i.start()
def wait(downloads, itempipes):
for d in downloads:
d.join()
for i in itempipes:
i.join()
if __name__ == '__main__':
taskQueue = Queue()
resultQueue = Queue()
downloads = [Downloader(taskQueue, resultQueue) for _ in range(4)]
itempipes = [ItemPipeline(resultQueue) for _ in range(2)]
start(downloads, itempipes)
# 发布任务
for i in range(100):
taskQueue.put('http://www.baidu.com/s?key=%d' % i)
time.sleep(0.1)
# 结束
taskQueue.put(0)
wait(downloads, itempipes)