Python多进程使用与总结
1.概要
众所周知,由于GIL锁的存在,Python多线程并不是真正意义上的多线程,不能很好的利用多核CPU,为了充分的利用系统资源,py提供了multiprocessing多进程库,其支持子进程、通信和数据共享、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。[参考](https://www.cnblogs.com/tkqasn/p/5701230.html)
- 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
- multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
- 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。
- Process.PID中保存有PID,如果进程还没有start(),则PID为None。
注意:window下启动子进程必须在if name == "main"语句后面写相关语句,这个坑踩过
注意:window下启动子进程必须在if name == "main"语句后面写相关语句,这个坑踩过
2.使用实例
2.1 创建多进程
方式一:直接使用Process
import multiprocessing
import queue
import time
import random
import os
def out(msg):
s = random.randint(1, 3) #随机延时
time.sleep(s)
print("msg:{} time:{} pid:{} ppid:{}".format(msg, time.asctime(time.localtime(time.time())), os.getpid(), os.getppid()))
def que():
Queue = queue.Queue()#创建队列 先进先出
for q in range(10):
Queue.put(q)
return Queue
if __name__ == "__main__":
Que = que()
for i in range(10):
p = multiprocessing.Process(target=out, args=(Que.get(), ))
p.start()
# p.join() # location1
# p.join() #location2
print("我是主进程")
基本逻辑:创建一个动作函数out() ,增加随机时间的延时,以便查看控制台输出差异,输出msg、当前时间、子进程pid、父进程pid。定义一个队列函数que(),作为生产者,为后面取得输出参数做好准备。p:创建一个进程,p.start()开启一个进程,p.join()阻塞主进程,直到子进程执行完毕.注意:window下启动子进程必须在if name == "main"语句后面写相关语句,这个坑踩过(不知道为什么这个markdown编辑器,双下滑线没有显示,可能是渲染引擎问题),args参数是元组类型,若是只有一个参数,需要加上最后
,
-
location1处和location2处join均注释,主进程在子进程之前执行完毕,控制台输出:
我是主进程 msg:2 time:Sun Mar 31 20:45:22 2019 pid:17068 ppid:28044 msg:4 time:Sun Mar 31 20:45:22 2019 pid:25004 ppid:28044 msg:5 time:Sun Mar 31 20:45:22 2019 pid:14576 ppid:28044 msg:7 time:Sun Mar 31 20:45:23 2019 pid:26384 ppid:28044 msg:9 time:Sun Mar 31 20:45:23 2019 pid:3368 ppid:28044 msg:1 time:Sun Mar 31 20:45:23 2019 pid:25052 ppid:28044 msg:3 time:Sun Mar 31 20:45:23 2019 pid:23948 ppid:28044 msg:0 time:Sun Mar 31 20:45:24 2019 pid:13660 ppid:28044 msg:6 time:Sun Mar 31 20:45:24 2019 pid:1052 ppid:28044 msg:8 time:Sun Mar 31 20:45:25 2019 pid:13968 ppid:28044
-
只有location2处join注释,主进程在子进程之后执行,但是可看出输出顺序是0~9,总的时间特别长,阻塞输出,控制台输出
msg:0 time:Sun Mar 31 20:51:06 2019 pid:11048 ppid:3936 msg:1 time:Sun Mar 31 20:51:09 2019 pid:18504 ppid:3936 msg:2 time:Sun Mar 31 20:51:10 2019 pid:15580 ppid:3936 msg:3 time:Sun Mar 31 20:51:12 2019 pid:25180 ppid:3936 msg:4 time:Sun Mar 31 20:51:15 2019 pid:25912 ppid:3936 msg:5 time:Sun Mar 31 20:51:17 2019 pid:22660 ppid:3936 msg:6 time:Sun Mar 31 20:51:21 2019 pid:13224 ppid:3936 msg:7 time:Sun Mar 31 20:51:23 2019 pid:14204 ppid:3936 msg:8 time:Sun Mar 31 20:51:24 2019 pid:2940 ppid:3936 msg:9 time:Sun Mar 31 20:51:28 2019 pid:9068 ppid:3936 我是主进程
-
只有location1处join注释,主进程在子进程之后执行,其中很多进程执行时间相同,而且顺序错乱,总时间短,控制台输出
msg:1 time:Sun Mar 31 20:52:04 2019 pid:24448 ppid:13532 msg:5 time:Sun Mar 31 20:52:04 2019 pid:23376 ppid:13532 msg:2 time:Sun Mar 31 20:52:05 2019 pid:22048 ppid:13532 msg:6 time:Sun Mar 31 20:52:05 2019 pid:18968 ppid:13532 msg:7 time:Sun Mar 31 20:52:05 2019 pid:26208 ppid:13532 msg:0 time:Sun Mar 31 20:52:06 2019 pid:15948 ppid:13532 msg:3 time:Sun Mar 31 20:52:06 2019 pid:26724 ppid:13532 msg:4 time:Sun Mar 31 20:52:06 2019 pid:13220 ppid:13532 msg:8 time:Sun Mar 31 20:52:06 2019 pid:24384 ppid:13532 msg:9 time:Sun Mar 31 20:52:06 2019 pid:4392 ppid:13532 我是主进程
方式二:重写run()方法
from multiprocessing import Process
import queue
import time
import random
import os
class Myprocess(Process):
def __init__(self, msg):
super(Myprocess, self).__init__()# 或者Process.__init__(self)
self.msg = msg
def run(self):
s = random.randint(1, 3) # 随机延时
time.sleep(s)
print("msg:{} time:{} pid:{} ppid:{}".format(self.msg, time.asctime(time.localtime(time.time())), os.getpid(),
os.getppid()))
def que():
Queue = queue.Queue() # 创建队列 先进先出
for q in range(10):
Queue.put(q)
return Queue
if __name__ == "__main__":
Que = que()
for i in range(10):
p = Myprocess(msg=Que.get())
p.start()
p.join()
print("我是主进程")
基本逻辑:定义一个子类,继承Process类,因为不想完全重写父类的init,所以这里使用super()或者Process.init(self)将子类参数传递给父类。其中run()方法为动作函数,这个也是来自父类的方法,这里因为准备完全重写父类的方法,所以不用super()方法,也可以使用super(Myprocess, self).run()来避免完全重写父类方法。函数功能run()对应前面的out()函数功能。
super() 函数是用于调用父类(超类)的一个方法。super 是用来解决多重继承问题的,直接用类名调用父类方法在使用单继承的时候没问题,但是如果使用多继承,会涉及到查找顺序(MRO)、重复调用(钻石继承)等种种问题。MRO 就是类的方法解析顺序表, 其实也就是继承父类方法时的顺序表
-
没弄明白为什么这个主进程没有阻塞,控制台输出:
msg:2 time:Sun Mar 31 21:40:37 2019 pid:4396 ppid:23176 msg:3 time:Sun Mar 31 21:40:37 2019 pid:18640 ppid:23176 msg:8 time:Sun Mar 31 21:40:37 2019 pid:13612 ppid:23176 msg:9 time:Sun Mar 31 21:40:37 2019 pid:13196 ppid:23176 我是主进程 msg:1 time:Sun Mar 31 21:40:38 2019 pid:16324 ppid:23176 msg:4 time:Sun Mar 31 21:40:38 2019 pid:14560 ppid:23176 msg:5 time:Sun Mar 31 21:40:38 2019 pid:6376 ppid:23176 msg:6 time:Sun Mar 31 21:40:38 2019 pid:26088 ppid:23176 msg:7 time:Sun Mar 31 21:40:38 2019 pid:23056 ppid:23176 msg:0 time:Sun Mar 31 21:40:39 2019 pid:14724 ppid:23176
2.2 Process类
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。。实例方法:
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程,但是子进程如果存在没完成,就会变成僵尸进程
属性:
authkey
daemon:和线程的setDeamon功能一样,守护进程,父进程死了子进程也死了
exitcode(进程在运行时为None、如果为–N,表示被信号N结束)
name:进程名字。
pid:进程号。
- is_alive()、p.name、p.pid 、p.exitcode:实例:下面代码用的是重写的Myprocess,直接使用Process也是一样的
..........
p = Myprocess(msg=10086)
p.start()
print("is_alive() :{}".format(p.is_alive()))
print("p.exitcode: {}".format(p.exitcode))
p.join()
print("is_alive() :{}".format(p.is_alive()))
print("p.exitcode: {}".format(p.exitcode))
print("p.name :{}".format(p.name))
print("p.pid :{}".format(p.pid))
print("我是主进程")
-
控制台输出:
is_alive() :True p.exitcode: None msg:10086 time:Sun Mar 31 22:04:38 2019 pid:22796 ppid:10980 is_alive() :False p.exitcode: 0 p.name :Myprocess-1 p.pid :22796 我是主进程
setDaemon()方法。主进程A中,创建了子进程B,并且在主进程A中调用了B.setDaemon(),这个的意思是,把主进程A设置为守护进程,这时候,要是主进程A执行结束了,就不管子进程B是否完成,一并和进程程A退出.这就是setDaemon方法的含义,这基本和join是相反的。此外,还有个要特别注意的:必须在start() 方法调用之前设置,默认False
代码,例1:
.......
p = Myprocess(msg=10086)
p.daemon = True
p.start()
# p.join()
print("我是主进程")
-
输出1:
我是主进程 Process finished with exit code 0
-
代码,例2:
........ p = Myprocess(msg=10086) # p.daemon = True p.start() # p.join() print("我是主进程")
-
输出2:
我是主进程 msg:10086 time:Sun Mar 31 22:21:34 2019 pid:6788 ppid:556
2.3 Pool类
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。进程池设置最好等于CPU核心数量
构造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context实例方法:
apply(func[, args[, kwds]]):同步进程池
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池
close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
terminate() : 结束工作进程,不在处理未完成的任务
join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。pool.join()必须使用在
-
进程池异步:
import multiprocessing import queue import time import random import os def out(msg): s = random.randint(1, 3) #随机延时 time.sleep(s) return "msg:{} time:{} pid:{} ppid:{}".format(msg, time.asctime(time.localtime(time.time())), os.getpid(), os.getppid()) # 接收函数 def callback(arg): print(arg) def que(): Queue = queue.Queue()#创建队列 先进先出 for q in range(10): Queue.put(q) return Queue if __name__ == "__main__": Que = que() pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())#取最大CPU数 for i in range(10): pool.apply_async(func=out, args=(Que.get(),), callback=callback) #callback自动接收func返回来的内容 pool.close() pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭 print("我是主进程")
-
控制台输出:总时间短,且执行顺序乱,非阻塞
msg:2 time:Sun Mar 31 22:35:44 2019 pid:23020 ppid:16652 msg:3 time:Sun Mar 31 22:35:44 2019 pid:26476 ppid:16652 msg:6 time:Sun Mar 31 22:35:44 2019 pid:26204 ppid:16652 msg:7 time:Sun Mar 31 22:35:44 2019 pid:17256 ppid:16652 msg:0 time:Sun Mar 31 22:35:45 2019 pid:26980 ppid:16652 msg:4 time:Sun Mar 31 22:35:45 2019 pid:10248 ppid:16652 msg:1 time:Sun Mar 31 22:35:46 2019 pid:23180 ppid:16652 msg:5 time:Sun Mar 31 22:35:46 2019 pid:22796 ppid:16652 msg:8 time:Sun Mar 31 22:35:47 2019 pid:23020 ppid:16652 msg:9 time:Sun Mar 31 22:35:47 2019 pid:26476 ppid:16652 我是主进程
-
进程池同步:
import multiprocessing import queue import time import random import os def out(msg): s = random.randint(1, 3) #随机延时 time.sleep(s) print("msg:{} time:{} pid:{} ppid:{}".format(msg, time.asctime(time.localtime(time.time())), os.getpid(), os.getppid())) def que(): Queue = queue.Queue()#创建队列 先进先出 for q in range(10): Queue.put(q) return Queue if __name__ == "__main__": Que = que() pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) for i in range(10): pool.apply(func=out, args=(Que.get(),)) #callback自动接收func返回来的内容 pool.close() pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭 print("我是主进程")
-
控制台输出:总时间长,顺序执行,阻塞
msg:0 time:Sun Mar 31 22:40:36 2019 pid:24072 ppid:13240 msg:1 time:Sun Mar 31 22:40:37 2019 pid:13552 ppid:13240 msg:2 time:Sun Mar 31 22:40:40 2019 pid:13736 ppid:13240 msg:3 time:Sun Mar 31 22:40:43 2019 pid:8040 ppid:13240 msg:4 time:Sun Mar 31 22:40:46 2019 pid:12532 ppid:13240 msg:5 time:Sun Mar 31 22:40:48 2019 pid:24272 ppid:13240 msg:6 time:Sun Mar 31 22:40:49 2019 pid:14028 ppid:13240 msg:7 time:Sun Mar 31 22:40:52 2019 pid:22340 ppid:13240 msg:8 time:Sun Mar 31 22:40:55 2019 pid:24072 ppid:13240 msg:9 time:Sun Mar 31 22:40:56 2019 pid:13552 ppid:13240 我是主进程
如果动作函数是带返回值的不仅可以使用callback输出还可以使用get()方法
-
错误的get使用方法:
def out(msg): s = random.randint(1, 3) #随机延时 time.sleep(s) print("msg:{} time:{} pid:{} ppid:{}".format(msg, time.asctime(time.localtime(time.time())), os.getpid(), os.getppid())) return msg ... ..... .........省略........ if __name__ == "__main__": Que = que() pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) l = [] for i in range(10): r = pool.apply_async(func=out, args=(Que.get(),)) print(r.get()) # l.append(r) pool.close() pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭 # for i in l: # print(i.get()) print("我是主进程")
-
控制台输出:顺讯执行,时间长,阻塞
msg:0 time:Sun Mar 31 22:49:22 2019 pid:15668 ppid:13468 0 msg:1 time:Sun Mar 31 22:49:24 2019 pid:5968 ppid:13468 1 msg:2 time:Sun Mar 31 22:49:25 2019 pid:25524 ppid:13468 2 msg:3 time:Sun Mar 31 22:49:28 2019 pid:14148 ppid:13468 3 msg:4 time:Sun Mar 31 22:49:31 2019 pid:15240 ppid:13468 4 msg:5 time:Sun Mar 31 22:49:32 2019 pid:22544 ppid:13468 5 msg:6 time:Sun Mar 31 22:49:35 2019 pid:5364 ppid:13468 6 msg:7 time:Sun Mar 31 22:49:37 2019 pid:24740 ppid:13468 7 msg:8 time:Sun Mar 31 22:49:40 2019 pid:15668 ppid:13468 8 msg:9 time:Sun Mar 31 22:49:43 2019 pid:5968 ppid:13468 9 我是主进程
-
错误的get使用方法:
........................... Que = que() pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) l = [] for i in range(10): r = pool.apply_async(func=out, args=(Que.get(),)) #callback自动接收func返回来的内容 # print(r.get()) l.append(r) pool.close() pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭 for i in l: print(i.get()) print("我是主进程")
-
控制台输出:顺序错乱,总时间短,非阻塞
msg:6 time:Sun Mar 31 22:53:28 2019 pid:8600 ppid:25532 msg:1 time:Sun Mar 31 22:53:29 2019 pid:5960 ppid:25532 msg:0 time:Sun Mar 31 22:53:29 2019 pid:24544 ppid:25532 msg:3 time:Sun Mar 31 22:53:29 2019 pid:15672 ppid:25532 msg:4 time:Sun Mar 31 22:53:29 2019 pid:21808 ppid:25532 msg:8 time:Sun Mar 31 22:53:29 2019 pid:8600 ppid:25532 msg:7 time:Sun Mar 31 22:53:29 2019 pid:19028 ppid:25532 msg:2 time:Sun Mar 31 22:53:30 2019 pid:9820 ppid:25532 msg:5 time:Sun Mar 31 22:53:30 2019 pid:12736 ppid:25532 msg:9 time:Sun Mar 31 22:53:32 2019 pid:24544 ppid:25532 0 1 2 3 4 5 6 7 8 9 我是主进程
2.4 数据共享
进程各自持有一份数据,默认无法共享数据
-
Array()实现:
- C语言参数类型对照
‘c’: ctypes.c_char ‘u’: ctypes.c_wchar ‘b’: ctypes.c_byte ‘B’: ctypes.c_ubyte ‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int ‘I’: ctypes.c_uint ‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong ‘f’: ctypes.c_float ‘d’: ctypes.c_double
-
代码块和输出:
from multiprocessing import Process, Array def f(a): a[0] = a[-1] if __name__ == '__main__': arr = Array('u', "Hello World") p = Process(target=f, args=(arr,)) p.start() p.join() print(arr[:]) -----控制台输出------ dello World Process finished with exit code 0
-
Manager()实现:
Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持
-
代码:
from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': manager = Manager() d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)
-
控制台输出:
{1: '1', '2': 2, 0.25: None} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] Process finished with exit code 0
2.5 Lock互斥锁
当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突
进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理。
注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全
-
代码实例,不加锁:
from multiprocessing import Process, Lock import time import random def write(lock, i): # lock.acquire() file(i) read() # lock.release() def file(a): with open("test.txt", "w") as f: f.write(a) def read(): with open("test.txt", "r") as f: line = f.readline() print("a: {} ".format(line)) if __name__ == "__main__": lock = Lock() file(str(0)) read() for i in range(10): p = Process(target=write, args=(lock, str(i))) p.start() p.join() print("我是主进程")
-
控制台输出:可见有几次读到的都是0,表明某个时刻几个进程都同时去访问了该文件
a: 0 a: 2 a: 0 a: 0 a: 3 a: 5 a: 4 a: 6 a: 7 a: 8 a: 9 我是主进程
-
代码实例,加锁:
.......... def write(lock, i): lock.acquire() file(i) read() lock.release() ...........
-
控制台输出:
a: 0 a: 1 a: 0 a: 2 a: 3 a: 4 a: 6 a: 5 a: 7 a: 8 a: 9 我是主进程
2.6 Semaphore
用来控制对共享资源的访问数量
-
普通代码示例
import multiprocessing import time def worker(i): # s.acquire() print(multiprocessing.current_process().name + "acquire") time.sleep(i) print(multiprocessing.current_process().name + "release\n") # s.release() if __name__ == "__main__": # s = multiprocessing.Semaphore(2) for i in range(10): p = multiprocessing.Process(target = worker, args=(i*2, )) p.start()
-
控制台输出:观察换行,可以看出同一时段,会有很多进程都在访问这个worke函数
Process-1acquire Process-2acquire Process-1release Process-3acquire Process-4acquire Process-5acquire Process-7acquire Process-6acquire Process-8acquire Process-9acquire Process-10acquire Process-2release Process-3release Process-4release Process-5release Process-6release Process-7release Process-8release Process-9release Process-10release Process finished with exit code 0
-
用Semaphore代码实例:
# -*- coding: utf-8 -*- # @Time : 2019/3/31 10:42 # @Author : YuChou # @Site : # @File : examp.py # @Software: PyCharm import multiprocessing import time def worker(i,s): s.acquire() print(multiprocessing.current_process().name + "acquire") time.sleep(i) print(multiprocessing.current_process().name + "release\n") s.release() if __name__ == "__main__": s = multiprocessing.Semaphore(2) for i in range(10): p = multiprocessing.Process(target = worker, args=(i*2, s)) p.start()
-
控制台输出:观察换行,可以看出同时并存最多只有两个进程
Process-3acquire Process-1acquire Process-1release Process-4acquire Process-3release Process-2acquire Process-2release Process-5acquire Process-4release Process-6acquire Process-5release Process-7acquire Process-6release Process-8acquire Process-7release Process-9acquire Process-8release Process-10acquire Process-9release Process-10release
2.7 Event
用于进程间通信,即程序中的其一个线程需要通过判断某个进程的状态来确定自己下一步的操作,就用到了event对象
event对象默认为假(Flase),即遇到event对象在等待就阻塞线程的执行
可以通过is_set()查看当前状态
event.wait() #括号里可以带数字执行,数字表示等待的秒数,不带数字表示一直阻塞状态
event.set() #默认为False,set一次表示True,所以子线程里的foo函数解除阻塞状态继续执行
-
主进程和子进程之间的通信:
import multiprocessing import time def child(even): print("start child.....") even.wait() print("end child.....") if __name__ == "__main__": even = multiprocessing.Event() p = multiprocessing.Process(target=child, args=(even,)) p.start() print("主进程等待....") time.sleep(4) print("主进程结束等待") even.set()
-
控制台输出:可以看出,子进程一直在等set
主进程等待.... start child..... 主进程结束等待 end child.....
-
给与wait值为2s
def child(even, s): print("start child.....") even.wait(s) print("end child.....") if __name__ == "__main__": even = multiprocessing.Event() p = multiprocessing.Process(target=child, args=(even,2)) p.start() print("主进程等待....") time.sleep(4) print("主进程结束等待") even.set()
-
控制台输出:可见并没有等待set, 子进程已经解除阻塞了
主进程等待.... start child..... end child..... 主进程结束等待
- 当然也可以用在子进程与子进程之间,不详解
2.8 Queue
Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。Queue的一段示例代码:
-
代码块:来源https://www.cnblogs.com/kaituorensheng/p/4445418.html
import multiprocessing def writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): try: print(q.get(block = False)) except: pass if __name__ == "__main__": q = multiprocessing.Queue() writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start() reader.join() writer.join()
-
控制台输出
1
2.9 Pipe
Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。close方法表示关闭管道,当消息接收结束以后,关闭管道。
-
代码:
import multiprocessing import time def pro(p): for i in range(10): p.send(i) print("我发啦") time.sleep(1) def cou(c): n=0 while n <= 9: print(c.recv()) print("我收啦\n") time.sleep(1) n += 1 if __name__ == "__main__": p, c = multiprocessing.Pipe() m1 = multiprocessing.Process(target=pro, args=(p,)) m2 = multiprocessing.Process(target=cou, args=(c,)) m1.start() m2.start() m1.join() m2.join()
-
控制台输出
我发啦 0 我收啦 我发啦 1 我收啦 我发啦 2 我收啦 我发啦 3 我收啦 我发啦 4 我收啦 我发啦 5 我收啦 我发啦 6 我收啦 我发啦 7 我收啦 我发啦 8 我收啦 我发啦 9 我收啦
2.10 subprocess三方模块
很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。
subprocess
模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。
-
代码块:
import subprocess r = subprocess.call(["python", "--version"]) #命令行空格用 ','隔开 print('console:', r)
-
控制台输出:
Python 3.6.5 :: Anaconda, Inc. console: 0
参考链接:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431927781401bb47ccf187b24c3b955157bb12c5882d000
https://www.cnblogs.com/kaituorensheng/p/4445418.html
https://www.cnblogs.com/UncleYong/p/6987112.html
https://www.cnblogs.com/haiyan123/p/7429568.html
https://www.cnblogs.com/lidagen/p/7252247.html
https://blog.51cto.com/286577399/2051155
2019/04/01 22:26
补充:多进程的map使用:
import multiprocessing
import requests
import time
import random
def get(_):
time.sleep(random.randint(0, 5))
url = "http://www.baidu.com"
r = requests.get(url)
print(r.status_code)
if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count())
pool.map(get, range(20))
2019/04/10 22:58