Python多进程

Python多进程使用与总结

1.概要

​ 众所周知,由于GIL锁的存在,Python多线程并不是真正意义上的多线程,不能很好的利用多核CPU,为了充分的利用系统资源,py提供了multiprocessing多进程库,其支持子进程、通信和数据共享、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。[参考](https://www.cnblogs.com/tkqasn/p/5701230.html

  • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
  • multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
  • 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。
  • Process.PID中保存有PID,如果进程还没有start(),则PID为None。

注意:window下启动子进程必须在if name == "main"语句后面写相关语句,这个坑踩过

注意:window下启动子进程必须在if name == "main"语句后面写相关语句,这个坑踩过

2.使用实例

2.1 创建多进程

方式一:直接使用Process

import multiprocessing
import queue
import time
import random
import os

def out(msg):
    s = random.randint(1, 3) #随机延时
    time.sleep(s)
    print("msg:{}  time:{}   pid:{} ppid:{}".format(msg, time.asctime(time.localtime(time.time())), os.getpid(), os.getppid()))


def que():
    Queue = queue.Queue()#创建队列 先进先出
    for q in range(10):
        Queue.put(q)
    return Queue


if __name__ == "__main__":

    Que = que() 
    for i in range(10):
        p = multiprocessing.Process(target=out, args=(Que.get(), ))
        p.start()
        # p.join() # location1
    # p.join() #location2
    print("我是主进程")
  • 基本逻辑:创建一个动作函数out() ,增加随机时间的延时,以便查看控制台输出差异,输出msg、当前时间、子进程pid、父进程pid。定义一个队列函数que(),作为生产者,为后面取得输出参数做好准备。p:创建一个进程,p.start()开启一个进程,p.join()阻塞主进程,直到子进程执行完毕.注意:window下启动子进程必须在if name == "main"语句后面写相关语句,这个坑踩过(不知道为什么这个markdown编辑器,双下滑线没有显示,可能是渲染引擎问题),args参数是元组类型,若是只有一个参数,需要加上最后

  • location1处和location2处join均注释,主进程在子进程之前执行完毕,控制台输出:

    我是主进程
    msg:2  time:Sun Mar 31 20:45:22 2019   pid:17068 ppid:28044
    msg:4  time:Sun Mar 31 20:45:22 2019   pid:25004 ppid:28044
    msg:5  time:Sun Mar 31 20:45:22 2019   pid:14576 ppid:28044
    msg:7  time:Sun Mar 31 20:45:23 2019   pid:26384 ppid:28044
    msg:9  time:Sun Mar 31 20:45:23 2019   pid:3368 ppid:28044
    msg:1  time:Sun Mar 31 20:45:23 2019   pid:25052 ppid:28044
    msg:3  time:Sun Mar 31 20:45:23 2019   pid:23948 ppid:28044
    msg:0  time:Sun Mar 31 20:45:24 2019   pid:13660 ppid:28044
    msg:6  time:Sun Mar 31 20:45:24 2019   pid:1052 ppid:28044
    msg:8  time:Sun Mar 31 20:45:25 2019   pid:13968 ppid:28044
    
  • 只有location2处join注释,主进程在子进程之后执行,但是可看出输出顺序是0~9,总的时间特别长,阻塞输出,控制台输出

    msg:0  time:Sun Mar 31 20:51:06 2019   pid:11048 ppid:3936
    msg:1  time:Sun Mar 31 20:51:09 2019   pid:18504 ppid:3936
    msg:2  time:Sun Mar 31 20:51:10 2019   pid:15580 ppid:3936
    msg:3  time:Sun Mar 31 20:51:12 2019   pid:25180 ppid:3936
    msg:4  time:Sun Mar 31 20:51:15 2019   pid:25912 ppid:3936
    msg:5  time:Sun Mar 31 20:51:17 2019   pid:22660 ppid:3936
    msg:6  time:Sun Mar 31 20:51:21 2019   pid:13224 ppid:3936
    msg:7  time:Sun Mar 31 20:51:23 2019   pid:14204 ppid:3936
    msg:8  time:Sun Mar 31 20:51:24 2019   pid:2940 ppid:3936
    msg:9  time:Sun Mar 31 20:51:28 2019   pid:9068 ppid:3936
    我是主进程
    
  • 只有location1处join注释,主进程在子进程之后执行,其中很多进程执行时间相同,而且顺序错乱,总时间短,控制台输出

    msg:1  time:Sun Mar 31 20:52:04 2019   pid:24448 ppid:13532
    msg:5  time:Sun Mar 31 20:52:04 2019   pid:23376 ppid:13532
    msg:2  time:Sun Mar 31 20:52:05 2019   pid:22048 ppid:13532
    msg:6  time:Sun Mar 31 20:52:05 2019   pid:18968 ppid:13532
    msg:7  time:Sun Mar 31 20:52:05 2019   pid:26208 ppid:13532
    msg:0  time:Sun Mar 31 20:52:06 2019   pid:15948 ppid:13532
    msg:3  time:Sun Mar 31 20:52:06 2019   pid:26724 ppid:13532
    msg:4  time:Sun Mar 31 20:52:06 2019   pid:13220 ppid:13532
    msg:8  time:Sun Mar 31 20:52:06 2019   pid:24384 ppid:13532
    msg:9  time:Sun Mar 31 20:52:06 2019   pid:4392 ppid:13532
    我是主进程
    

方式二:重写run()方法

from multiprocessing import Process
import queue
import time
import random
import os

class Myprocess(Process):

    def __init__(self, msg):
        super(Myprocess, self).__init__()# 或者Process.__init__(self)
        self.msg = msg

    def run(self):
        s = random.randint(1, 3)  # 随机延时
        time.sleep(s)
        print("msg:{}  time:{}   pid:{} ppid:{}".format(self.msg, time.asctime(time.localtime(time.time())), os.getpid(),
                                                        os.getppid()))

def que():
    Queue = queue.Queue()  # 创建队列 先进先出
    for q in range(10):
        Queue.put(q)
    return Queue

if __name__ == "__main__":

    Que = que()
    for i in range(10):
        p = Myprocess(msg=Que.get())
        p.start()
    p.join()
    print("我是主进程")
  • 基本逻辑:定义一个子类,继承Process类,因为不想完全重写父类的init,所以这里使用super()或者Process.init(self)将子类参数传递给父类。其中run()方法为动作函数,这个也是来自父类的方法,这里因为准备完全重写父类的方法,所以不用super()方法,也可以使用super(Myprocess, self).run()来避免完全重写父类方法。函数功能run()对应前面的out()函数功能。

  • super() 函数是用于调用父类(超类)的一个方法。super 是用来解决多重继承问题的,直接用类名调用父类方法在使用单继承的时候没问题,但是如果使用多继承,会涉及到查找顺序(MRO)、重复调用(钻石继承)等种种问题。MRO 就是类的方法解析顺序表, 其实也就是继承父类方法时的顺序表

  • 没弄明白为什么这个主进程没有阻塞,控制台输出:

    msg:2  time:Sun Mar 31 21:40:37 2019   pid:4396 ppid:23176
    msg:3  time:Sun Mar 31 21:40:37 2019   pid:18640 ppid:23176
    msg:8  time:Sun Mar 31 21:40:37 2019   pid:13612 ppid:23176
    msg:9  time:Sun Mar 31 21:40:37 2019   pid:13196 ppid:23176
    我是主进程
    msg:1  time:Sun Mar 31 21:40:38 2019   pid:16324 ppid:23176
    msg:4  time:Sun Mar 31 21:40:38 2019   pid:14560 ppid:23176
    msg:5  time:Sun Mar 31 21:40:38 2019   pid:6376 ppid:23176
    msg:6  time:Sun Mar 31 21:40:38 2019   pid:26088 ppid:23176
    msg:7  time:Sun Mar 31 21:40:38 2019   pid:23056 ppid:23176
    msg:0  time:Sun Mar 31 21:40:39 2019   pid:14724 ppid:23176
    
2.2 Process类

  • 构造方法:

    Process([group [, target [, name [, args [, kwargs]]]]])

    group: 线程组,目前还没有实现,库引用中提示必须是None;
      target: 要执行的方法;
      name: 进程名;
      args/kwargs: 要传入方法的参数。。

  • 实例方法:

    is_alive():返回进程是否在运行。

    join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

    start():进程准备就绪,等待CPU调度

    run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

    terminate():不管任务是否完成,立即停止工作进程,但是子进程如果存在没完成,就会变成僵尸进程

  • 属性:

    authkey

    daemon:和线程的setDeamon功能一样,守护进程,父进程死了子进程也死了

    exitcode(进程在运行时为None、如果为–N,表示被信号N结束)

    name:进程名字。

    pid:进程号。

  • is_alive()、p.name、p.pid 、p.exitcode:实例:下面代码用的是重写的Myprocess,直接使用Process也是一样的
    ..........
    p = Myprocess(msg=10086)
    p.start()
    print("is_alive() :{}".format(p.is_alive()))
    print("p.exitcode: {}".format(p.exitcode))
    p.join()
    print("is_alive() :{}".format(p.is_alive()))
    print("p.exitcode: {}".format(p.exitcode))
    print("p.name :{}".format(p.name))
    print("p.pid :{}".format(p.pid))
    print("我是主进程")


  • 控制台输出:

    is_alive() :True
    p.exitcode: None
    msg:10086  time:Sun Mar 31 22:04:38 2019   pid:22796 ppid:10980
    is_alive() :False
    p.exitcode: 0
    p.name :Myprocess-1
    p.pid :22796
    我是主进程
    
    
  • setDaemon()方法。主进程A中,创建了子进程B,并且在主进程A中调用了B.setDaemon(),这个的意思是,把主进程A设置为守护进程,这时候,要是主进程A执行结束了,就不管子进程B是否完成,一并和进程程A退出.这就是setDaemon方法的含义,这基本和join是相反的。此外,还有个要特别注意的:必须在start() 方法调用之前设置,默认False

  • 代码,例1:

    .......
    p = Myprocess(msg=10086)
    p.daemon = True
    p.start()
    # p.join()
    print("我是主进程")
  • 输出1:

    我是主进程
    
    Process finished with exit code 0
    
  • 代码,例2:

       ........
       p = Myprocess(msg=10086)
        # p.daemon = True
        p.start()
        # p.join()
        print("我是主进程")
    
  • 输出2:

    我是主进程
    msg:10086  time:Sun Mar 31 22:21:34 2019   pid:6788 ppid:556
    
2.3 Pool类

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。进程池设置最好等于CPU核心数量

构造方法:

Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

实例方法:

apply(func[, args[, kwds]]):同步进程池

apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池

close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。

terminate() : 结束工作进程,不在处理未完成的任务

join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。pool.join()必须使用在

  • 进程池异步:

    import multiprocessing
    import queue
    import time
    import random
    import os
    
    def out(msg):
        s = random.randint(1, 3) #随机延时
        time.sleep(s)
        return "msg:{}  time:{}   pid:{} ppid:{}".format(msg, time.asctime(time.localtime(time.time())), os.getpid(), os.getppid())
    
    # 接收函数
    def callback(arg):
        print(arg)
    
    def que():
        Queue = queue.Queue()#创建队列 先进先出
        for q in range(10):
            Queue.put(q)
        return Queue
    
    
    if __name__ == "__main__":
    
        Que = que()
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())#取最大CPU数
        for i in range(10):
            pool.apply_async(func=out, args=(Que.get(),), callback=callback) #callback自动接收func返回来的内容
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭
        print("我是主进程")
    
  • 控制台输出:总时间短,且执行顺序乱,非阻塞

    msg:2  time:Sun Mar 31 22:35:44 2019   pid:23020 ppid:16652
    msg:3  time:Sun Mar 31 22:35:44 2019   pid:26476 ppid:16652
    msg:6  time:Sun Mar 31 22:35:44 2019   pid:26204 ppid:16652
    msg:7  time:Sun Mar 31 22:35:44 2019   pid:17256 ppid:16652
    msg:0  time:Sun Mar 31 22:35:45 2019   pid:26980 ppid:16652
    msg:4  time:Sun Mar 31 22:35:45 2019   pid:10248 ppid:16652
    msg:1  time:Sun Mar 31 22:35:46 2019   pid:23180 ppid:16652
    msg:5  time:Sun Mar 31 22:35:46 2019   pid:22796 ppid:16652
    msg:8  time:Sun Mar 31 22:35:47 2019   pid:23020 ppid:16652
    msg:9  time:Sun Mar 31 22:35:47 2019   pid:26476 ppid:16652
    我是主进程
    
  • 进程池同步:

    import multiprocessing
    import queue
    import time
    import random
    import os
    
    def out(msg):
        s = random.randint(1, 3) #随机延时
        time.sleep(s)
        print("msg:{}  time:{}   pid:{} ppid:{}".format(msg, time.asctime(time.localtime(time.time())), os.getpid(), os.getppid()))
    
    def que():
        Queue = queue.Queue()#创建队列 先进先出
        for q in range(10):
            Queue.put(q)
        return Queue
    
    
    if __name__ == "__main__":
    
        Que = que()
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
        for i in range(10):
            pool.apply(func=out, args=(Que.get(),)) #callback自动接收func返回来的内容
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭
        print("我是主进程")
    
    
  • 控制台输出:总时间长,顺序执行,阻塞

    msg:0  time:Sun Mar 31 22:40:36 2019   pid:24072 ppid:13240
    msg:1  time:Sun Mar 31 22:40:37 2019   pid:13552 ppid:13240
    msg:2  time:Sun Mar 31 22:40:40 2019   pid:13736 ppid:13240
    msg:3  time:Sun Mar 31 22:40:43 2019   pid:8040 ppid:13240
    msg:4  time:Sun Mar 31 22:40:46 2019   pid:12532 ppid:13240
    msg:5  time:Sun Mar 31 22:40:48 2019   pid:24272 ppid:13240
    msg:6  time:Sun Mar 31 22:40:49 2019   pid:14028 ppid:13240
    msg:7  time:Sun Mar 31 22:40:52 2019   pid:22340 ppid:13240
    msg:8  time:Sun Mar 31 22:40:55 2019   pid:24072 ppid:13240
    msg:9  time:Sun Mar 31 22:40:56 2019   pid:13552 ppid:13240
    我是主进程
    
  • 如果动作函数是带返回值的不仅可以使用callback输出还可以使用get()方法

  • 错误的get使用方法:

    def out(msg):
        s = random.randint(1, 3) #随机延时
        time.sleep(s)
        print("msg:{}  time:{}   pid:{} ppid:{}".format(msg, time.asctime(time.localtime(time.time())), os.getpid(), os.getppid()))
        return msg
      ... ..... .........省略........
        if __name__ == "__main__":
    
        Que = que()
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
        l = []
        for i in range(10):
            r = pool.apply_async(func=out, args=(Que.get(),)) 
            print(r.get())
            # l.append(r)
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭
        # for i in l:
        #     print(i.get())
        print("我是主进程")
    
  • 控制台输出:顺讯执行,时间长,阻塞

    msg:0  time:Sun Mar 31 22:49:22 2019   pid:15668 ppid:13468
    0
    msg:1  time:Sun Mar 31 22:49:24 2019   pid:5968 ppid:13468
    1
    msg:2  time:Sun Mar 31 22:49:25 2019   pid:25524 ppid:13468
    2
    msg:3  time:Sun Mar 31 22:49:28 2019   pid:14148 ppid:13468
    3
    msg:4  time:Sun Mar 31 22:49:31 2019   pid:15240 ppid:13468
    4
    msg:5  time:Sun Mar 31 22:49:32 2019   pid:22544 ppid:13468
    5
    msg:6  time:Sun Mar 31 22:49:35 2019   pid:5364 ppid:13468
    6
    msg:7  time:Sun Mar 31 22:49:37 2019   pid:24740 ppid:13468
    7
    msg:8  time:Sun Mar 31 22:49:40 2019   pid:15668 ppid:13468
    8
    msg:9  time:Sun Mar 31 22:49:43 2019   pid:5968 ppid:13468
    9
    我是主进程
    
  • 错误的get使用方法:

    ...........................
        Que = que()
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
        l = []
        for i in range(10):
            r = pool.apply_async(func=out, args=(Que.get(),)) #callback自动接收func返回来的内容
            # print(r.get())
            l.append(r)
        pool.close()
        pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭
    
        for i in l:
            print(i.get())
        print("我是主进程")
    
  • 控制台输出:顺序错乱,总时间短,非阻塞

    msg:6  time:Sun Mar 31 22:53:28 2019   pid:8600 ppid:25532
    msg:1  time:Sun Mar 31 22:53:29 2019   pid:5960 ppid:25532
    msg:0  time:Sun Mar 31 22:53:29 2019   pid:24544 ppid:25532
    msg:3  time:Sun Mar 31 22:53:29 2019   pid:15672 ppid:25532
    msg:4  time:Sun Mar 31 22:53:29 2019   pid:21808 ppid:25532
    msg:8  time:Sun Mar 31 22:53:29 2019   pid:8600 ppid:25532
    msg:7  time:Sun Mar 31 22:53:29 2019   pid:19028 ppid:25532
    msg:2  time:Sun Mar 31 22:53:30 2019   pid:9820 ppid:25532
    msg:5  time:Sun Mar 31 22:53:30 2019   pid:12736 ppid:25532
    msg:9  time:Sun Mar 31 22:53:32 2019   pid:24544 ppid:25532
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    我是主进程
    
2.4 数据共享

进程各自持有一份数据,默认无法共享数据

  • Array()实现:

    • C语言参数类型对照
    ‘c’: ctypes.c_char  ‘u’: ctypes.c_wchar  ‘b’: ctypes.c_byte    ‘B’: ctypes.c_ubyte
    ‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int     ‘I’: ctypes.c_uint
    ‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong  ‘f’: ctypes.c_float  ‘d’: ctypes.c_double
    
    • 代码块和输出:

      from multiprocessing import Process, Array
      
      def f(a):
          a[0] = a[-1]
      
      if __name__ == '__main__':
          arr = Array('u', "Hello World")
          p = Process(target=f, args=(arr,))
          p.start()
          p.join()
          print(arr[:])
          
          
          -----控制台输出------
          
          dello World
      
      Process finished with exit code 0
      
  • Manager()实现:

    Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持

  • 代码:

    from multiprocessing import Process, Manager
    
    
    def f(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.reverse()
    
    
    if __name__ == '__main__':
        manager = Manager()
    
        d = manager.dict()
        l = manager.list(range(10))
    
        p = Process(target=f, args=(d, l))
        p.start()
        p.join()
    
        print(d)
        print(l)
    
  • 控制台输出:

    {1: '1', '2': 2, 0.25: None}
    [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
    
    Process finished with exit code 0
    
2.5 Lock互斥锁

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突

进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理。

注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全

  • 代码实例,不加锁:

    from multiprocessing import Process, Lock
    import time
    import random
    
    def write(lock, i):
        # lock.acquire()
        file(i)
        read()
        # lock.release()
    
    
    def file(a):
        with open("test.txt", "w") as f:
            f.write(a)
    
    def read():
        with open("test.txt", "r") as f:
            line = f.readline()
            print("a: {} ".format(line))
            
    if __name__ == "__main__":
    
    
        lock = Lock()
        file(str(0))
        read()
        for  i in range(10):
            p = Process(target=write, args=(lock, str(i)))
            p.start()
        p.join()
        print("我是主进程")
    
  • 控制台输出:可见有几次读到的都是0,表明某个时刻几个进程都同时去访问了该文件

    a: 0 
    a: 2 
    a: 0 
    a: 0 
    a: 3 
    a: 5 
    a: 4 
    a: 6 
    a: 7 
    a: 8 
    a: 9 
    我是主进程
    
  • 代码实例,加锁:

    ..........
    def write(lock, i):
        lock.acquire()
        file(i)
        read()
        lock.release()
    ...........
    
  • 控制台输出:

    a: 0 
    a: 1 
    a: 0 
    a: 2 
    a: 3 
    a: 4 
    a: 6 
    a: 5 
    a: 7 
    a: 8 
    a: 9 
    我是主进程
    
2.6 Semaphore

用来控制对共享资源的访问数量

  • 普通代码示例

    import multiprocessing
    import time
    
    def worker(i):
        # s.acquire()
        print(multiprocessing.current_process().name + "acquire")
        time.sleep(i)
        print(multiprocessing.current_process().name + "release\n")
        # s.release()
    
    if __name__ == "__main__":
        # s = multiprocessing.Semaphore(2)
        for i in range(10):
            p = multiprocessing.Process(target = worker, args=(i*2, ))
            p.start()
    
  • 控制台输出:观察换行,可以看出同一时段,会有很多进程都在访问这个worke函数

    Process-1acquire
    Process-2acquire
    Process-1release
    
    Process-3acquire
    Process-4acquire
    Process-5acquire
    Process-7acquire
    Process-6acquire
    Process-8acquire
    Process-9acquire
    Process-10acquire
    Process-2release
    
    Process-3release
    
    Process-4release
    
    Process-5release
    
    Process-6release
    
    Process-7release
    
    Process-8release
    
    Process-9release
    
    Process-10release
    
    
    Process finished with exit code 0
    
    
  • 用Semaphore代码实例:

    # -*- coding: utf-8 -*-
    # @Time    : 2019/3/31 10:42
    # @Author  : YuChou
    # @Site    : 
    # @File    : examp.py
    # @Software: PyCharm
    import multiprocessing
    import time
    
    def worker(i,s):
        s.acquire()
        print(multiprocessing.current_process().name + "acquire")
        time.sleep(i)
        print(multiprocessing.current_process().name + "release\n")
        s.release()
    
    if __name__ == "__main__":
        s = multiprocessing.Semaphore(2)
        for i in range(10):
            p = multiprocessing.Process(target = worker, args=(i*2, s))
            p.start()
    
  • 控制台输出:观察换行,可以看出同时并存最多只有两个进程

    Process-3acquire
    Process-1acquire
    Process-1release
    
    Process-4acquire
    Process-3release
    
    Process-2acquire
    Process-2release
    
    Process-5acquire
    Process-4release
    
    Process-6acquire
    Process-5release
    
    Process-7acquire
    Process-6release
    
    Process-8acquire
    Process-7release
    
    Process-9acquire
    Process-8release
    
    Process-10acquire
    Process-9release
    
    Process-10release
    
    
2.7 Event

用于进程间通信,即程序中的其一个线程需要通过判断某个进程的状态来确定自己下一步的操作,就用到了event对象

event对象默认为假(Flase),即遇到event对象在等待就阻塞线程的执行

可以通过is_set()查看当前状态

  • event.wait()    #括号里可以带数字执行,数字表示等待的秒数,不带数字表示一直阻塞状态
    
  • event.set()     #默认为False,set一次表示True,所以子线程里的foo函数解除阻塞状态继续执行
    
  • 主进程和子进程之间的通信:

    import multiprocessing
    import time
    
    def child(even):
        print("start child.....")
        even.wait()
        print("end child.....")
    
    if __name__ == "__main__":
        
        even = multiprocessing.Event()
        p = multiprocessing.Process(target=child, args=(even,))
        p.start()
        print("主进程等待....")
        time.sleep(4)
        print("主进程结束等待")
        even.set()
    
    
  • 控制台输出:可以看出,子进程一直在等set

    主进程等待....
    start child.....
    主进程结束等待
    end child.....
    
  • 给与wait值为2s

    def child(even, s):
        print("start child.....")
        even.wait(s)
        print("end child.....")
    
    if __name__ == "__main__":
    
        even = multiprocessing.Event()
        p = multiprocessing.Process(target=child, args=(even,2))
        p.start()
        print("主进程等待....")
        time.sleep(4)
        print("主进程结束等待")
        even.set()
    
  • 控制台输出:可见并没有等待set, 子进程已经解除阻塞了

    主进程等待....
    start child.....
    end child.....
    主进程结束等待
    
  • 当然也可以用在子进程与子进程之间,不详解
2.8 Queue

Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。Queue的一段示例代码:

  • 代码块:来源https://www.cnblogs.com/kaituorensheng/p/4445418.html

    import multiprocessing
    
    def writer_proc(q):
        try:
            q.put(1, block = False)
        except:
            pass
    
    def reader_proc(q):
        try:
            print(q.get(block = False))
        except:
            pass
    
    if __name__ == "__main__":
        q = multiprocessing.Queue()
        writer = multiprocessing.Process(target=writer_proc, args=(q,))
        writer.start()
    
        reader = multiprocessing.Process(target=reader_proc, args=(q,))
        reader.start()
    
        reader.join()
        writer.join()
    
  • 控制台输出

    1
    
2.9 Pipe

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。close方法表示关闭管道,当消息接收结束以后,关闭管道。

  • 代码:

    import multiprocessing
    import time
    
    def pro(p):
        for i in range(10):
            p.send(i)
            print("我发啦")
            time.sleep(1)
    
    def cou(c):
        n=0
        while n <= 9:
           print(c.recv())
           print("我收啦\n")
           time.sleep(1)
           n += 1
    
    
    
    if __name__ == "__main__":
        p, c = multiprocessing.Pipe()
        m1 = multiprocessing.Process(target=pro, args=(p,))
        m2 = multiprocessing.Process(target=cou, args=(c,))
        m1.start()
        m2.start()
        m1.join()
        m2.join()
    
  • 控制台输出

    我发啦
    0
    我收啦
    
    我发啦
    1
    我收啦
    
    我发啦
    2
    我收啦
    
    我发啦
    3
    我收啦
    
    我发啦
    4
    我收啦
    
    我发啦
    5
    我收啦
    
    我发啦
    6
    我收啦
    
    我发啦
    7
    我收啦
    
    我发啦
    8
    我收啦
    
    我发啦
    9
    我收啦
    
2.10 subprocess三方模块

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

  • 代码块:

    import subprocess
    
    r = subprocess.call(["python", "--version"]) #命令行空格用 ','隔开
    print('console:', r)
    
  • 控制台输出:

    Python 3.6.5 :: Anaconda, Inc.
    console: 0
    

参考链接:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431927781401bb47ccf187b24c3b955157bb12c5882d000

https://www.cnblogs.com/kaituorensheng/p/4445418.html

https://www.cnblogs.com/UncleYong/p/6987112.html

https://www.cnblogs.com/haiyan123/p/7429568.html

https://www.cnblogs.com/lidagen/p/7252247.html

https://blog.51cto.com/286577399/2051155


2019/04/01 22:26

补充:多进程的map使用:

import multiprocessing
import requests
import time
import random


def get(_):
    time.sleep(random.randint(0, 5))
    url = "http://www.baidu.com"
    r = requests.get(url)
    print(r.status_code)


if __name__ == "__main__":
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    pool.map(get, range(20)) 

2019/04/10 22:58

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容