1-进程

多任务原理

现代操作系统(Windows、Mac OS X、Linux、UNIX等)都支持“多任务”

什么叫多任务???
操作系统同时可以运行多个任务

单核CPU实现多任务原理:操作系统轮流让各个任务交替执行,QQ执行2us,切换到微信,在执行2us,再切换到陌陌,执行2us……。表面是看,每个任务反复执行下去,但是CPU调度执行速度太快了,导致我们感觉就行所有任务都在同时执行一样

多核CPU实现多任务原理:真正的秉性执行多任务只能在多核CPU上实现,但是由于任务数量远远多于CPU的核心数量,所以,操作系统也会自动把很多任务轮流调度到每个核心上执行

并发:看上去一起执行,任务数多于CPU核心数
并行:真正一起执行,任务数小于等于CPU核心数

实现多任务的方式:
1、多进程模式
2、多线程模式
3、协程模式
4、多进程+多线程模式

进程概念

对于操作系统而言,一个任务就是一个进程

进程是系统中程序执行和资源分配的基本单位。每个进程都有自己的数据段、代码段、和堆栈段

多进程: 多进程不能共享资源,多个进程相互独立,多个进程受控于操作系统.
多线程: 多线程可以共享资源,线程受控于进程.
多个线程在同一个应用程序中.

什么是进程?

进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。

进程的生命周期

进程生命周期.jpg

单任务现象

from time import sleep

def run():
    while True:
        print("sunck is a nice man")
        sleep(1.2)

if __name__ == "__main__":
    while True:
        print("he is a good man")
        sleep(1)

    # 不会执行到run方法,只有上面的while循环结束才可以执行
    run()

父进程和子进程

Linux 操作系统提供了一个 fork() 函数用来创建子进程,这个函数很特殊,调用一次,返回两次,因为操作系统是将当前的进程(父进程)复制了一份(子进程),然后分别在父进程和子进程内返回。子进程永远返回0,而父进程返回子进程的 PID。我们可以通过判断返回值是不是 0 来判断当前是在父进程还是子进程中执行。

​ 在 Python 中同样提供了 fork() 函数,此函数位于 os 模块下。

import os
import time

print("在创建子进程前: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))

pid = os.fork()
if pid == 0:
    print("子进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    time.sleep(5)
else:
    print("父进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    # pid表示回收的子进程的pid
    #pid, result = os.wait()  # 回收子进程资源  阻塞
    time.sleep(5)
    #print("父进程:回收的子进程pid=%d" % pid)
    #print("父进程:子进程退出时 result=%d" % result)

# 下面的内容会被打印两次,一次是在父进程中,一次是在子进程中。
# 父进程中拿到的返回值是创建的子进程的pid,大于0
print("fork创建完后: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))

父子进程如何区分?

子进程是父进程通过fork()产生出来的,pid = os.fork()

通过返回值pid是否为0,判断是否为子进程,如果是0,则表示是子进程

由于 fork() 是 Linux 上的概念,所以如果要跨平台,最好还是使用 subprocess 模块来创建子进程。

子进程如何回收?

python中采用os.wait()方法用来回收子进程占用的资源

pid, result = os.wait() # 回收子进程资源  阻塞,等待子进程执行完成回收

在UNIX 系统中,一个进程结束了,但是他的父进程没有等待(调用wait / waitpid)他, 那么他将变成一个僵尸进程。 但是如果该进程的父进程已经先结束了,那么该进程就不会变成僵尸进程, 因为每个进程结束的时候,系统都会扫描当前系统中所运行的所有进程, 看有没有哪个进程是刚刚结束的这个进程的子进程,如果是的话,就由Init 来接管他,成为他的父进程……

fork()

缺点:
​ 1.兼容性差,只能在类linux系统下使用,windows系统不可使用;
​ 2.扩展性差,当需要多条进程的时候,进程管理变得很复杂;
​ 3.会产生“孤儿”进程和“僵尸”进程,需要手动回收资源。
优点:
​ 是系统自带的接近低层的创建方式,运行效率高。

进程实现多任务

multiprocessing模块提供Process类实现新建进程

特点:
​ 1.注意:Process对象可以创建进程,但Process对象不是进程,其删除与否与系统资源是否被回收没有直接的关系。
2.主进程执行完毕后会默认等待子进程结束后回收资源,不需要手动回收资源;join()函数用来控制子进程
​ 结束的顺序,其内部也有一个清除僵尸进程的函数,可以回收资源;
3.Process进程创建时,子进程会将主进程的Process对象完全复制一份,这样在主进程和子进程各有一个 Process对象,但是p.start()启动的是子进程,主进程中的Process对象作为一个静态对象存在,不执行。

4.当子进程执行完毕后,会产生一个僵尸进程,其会被join函数回收,或者再有一条进程开启,start函数也会回收僵尸进程,所以不一定需要写join函数。
5.windows系统在子进程结束后会立即自动清除子进程的Process对象,而linux系统子进程的Process对象如果没有join函数和start函数的话会在主进程结束后统一清除。

另外还可以通过继承Process对象来重写run方法创建进程

'''
multiprocessing 库
跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象
'''

from multiprocessing import Process
from time import sleep
import os

#子进程需要执行的买吗
def run(str):
    while True:
        # os.getpid()获取当前进程id号
        # os.getppid()获取当前进程的父进程id号
        print("he is a %s man--%s--%s"%(str, os.getpid(),os.getppid()))
        sleep(1.2)

if __name__ == "__main__":
    print("主(父)进程启动-%s"%(os.getpid()))
    #创建子进程
    #target说明进程执行的任务
    p = Process(target=run, args=("nice",))
    #启动进程
    p.start()

    while True:
        print("he is a good man")
        sleep(1)

父进程的先后顺序

from multiprocessing import Process
from time import sleep
import os

def run(str):
    print("子进程启动")
    sleep(3)
    print("子进程结束")

if __name__ == "__main__":
    print("父进程启动")

    p = Process(target=run, args=("nice",))
    p.start()
    
    # 在不加join的时候,父进程结束后子进程称为孤儿进程,继续运行,父进程的结束不能影响子进程
    #利用join,进程阻塞,让父进程等待子进程结束再执行父进程
    p.join()
    print("父进程结束")

全局变量在多个进程中不能共享

父进程和子进程使用不同的堆栈段

from multiprocessing import Process
from time import sleep

num = 100

def run():
    print("子进程开始")
    global num # num = 100
    num += 1
    print(num)  # 101
    print("子进程结束")

if __name__ == "__main__":
    print("父进程开始")

    p = Process(target=run)
    p.start()
    p.join()

    # 在子进程中修改全局变量对父进程中的全局变量没有影响
    # 在创建子进程时对全局变量做了一个备份,父进程中的与子进程中的num是完全不同的两个变量
    print("父进程结束--%d"%num)  # 100

进程池

from multiprocessing import Pool
import os, time, random

def run(name):
    print("子进程%d启动--%s" % (name, os.getpid()))
    start = time.time()
    time.sleep(random.choice([1,2,3]))
    end = time.time()
    print("子进程%d结束--%s--耗时%.2f" % (name, os.getpid(), end-start))


if __name__ == "__main__":
    print("父进程启动")

    #创建多个进程
    #进程池 Pool
    #表示可以同时执行的进程数量
    #Pool默认大小是CPU核心数
    pp = Pool(4)
    for i in range(6):
        # 先同时启动4个子进程,因为是4核,有进程结束后才能执行第5,6个
        #创建进程,放入进程池统一管理
        pp.apply_async(run,args=(i,))

    #在调用join之前必须先调用close,调用close之后就不能再继续添加新的进程了
    pp.close()
    #进程池对象调用join,会等待进程池中所有的子进程结束完毕再去执行父进程
    pp.join()

    print("父进程结束")

述代码中的pool.apply_async()apply()函数的变体,apply_async()apply()的并行版本,apply()apply_async()的阻塞版本,使用apply()主进程会被阻塞直到函数执行结束,所以说是阻塞版本。apply()既是Pool的方法,也是Python内置的函数,两者等价。可以看到输出结果并不是按照代码for循环中的顺序输出的。

多个子进程并返回值

apply_async()本身就可以返回被进程调用的函数的返回值。上一个创建多个子进程的代码中,如果在函数func中返回一个值,那么pool.apply_async(func, (msg, ))的结果就是返回pool中所有进程的值的对象(注意是对象,不是值本身)

import multiprocessing
import time

def func(msg):
    return multiprocessing.current_process().name + '-' + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4) # 创建4个进程
    results = []
    for i in range(20):
        msg = "process %d" %(i)
        results.append(pool.apply_async(func, (msg, )))
    pool.close() # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用
    pool.join() # 等待进程池中的所有进程执行完毕
    print ("Sub-process(es) done.")

    for res in results:
        print (res.get())

与之前的输出不同,这次的输出是有序的。

​ 如果电脑是八核,建立8个进程,在Ubuntu下输入top命令再按下大键盘的1,可以看到每个CPU的使用率是比较平均的

拷贝文件

import os, time
from multiprocessing import Pool



#实现文件的拷贝
def copyFile(rPath, wPath):
    fr = open(rPath, "rb")
    fw = open(wPath, "wb")
    context = fr.read()
    fw.write(context)
    fr.close()
    fw.close()


path = r"C:\Users\xlg\Desktop\Python-1704\day20\2、进程\file"
toPath = r"C:\Users\xlg\Desktop\Python-1704\day20\2、进程\toFile"


#读取path下的都有的文件
filesList = os.listdir(path)

#启动for循环处理每一个文件
start = time.time()
for fileName in filesList:
    copyFile(os.path.join(path,fileName), os.path.join(toPath,fileName))
end = time.time()
print("总耗时:%0.2f" % (end-start))

多进程实现文件的拷贝

import os, time
from multiprocessing import Pool


#实现文件的拷贝
def copyFile(rPath, wPath):
    fr = open(rPath, "rb")
    fw = open(wPath, "wb")
    context = fr.read()
    fw.write(context)
    fr.close()
    fw.close()


path = r"C:\Users\xlg\Desktop\Python-1704\day20\2、进程\file"
toPath = r"C:\Users\xlg\Desktop\Python-1704\day20\2、进程\toFile"


if __name__ == "__main__":
    # 读取path下的都有的文件
    filesList = os.listdir(path)

    start = time.time()
    pp = Pool(4)
    for fileName in filesList:
        pp.apply_async(copyFile, args=(os.path.join(path,fileName), os.path.join(toPath,fileName)))

    pp.close()
    pp.join()
    end = time.time()
    print("总耗时:%0.2f" % (end-start))

封装进程对象

# myProcess.py
from multiprocessing import Process
import os, time

class MyProcess(Process):
    def __init__(self,name):
        Process.__init__(self)
        self.name = name

    def run(self):
        print("子进程(%s-%s)启动" % (self.name, os.getpid()))
        #子进程的功能
        time.sleep(3)
        print("子进程(%s-%s)结束" % (self.name, os.getpid()))

from myProcess import MyProcess

if __name__ == "__main__":
    print("父进程启动")

    #创建子进程
    p = myProcess("test")
    # 自动调用p进程对象的run方法
    p.start()
    p.join()

    print("父进程结束")

进程间通信

  1. 管道pipe(全双工,半双工):管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。
  2. 命名管道FIFO:有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
  3. 消息队列MessageQueue:消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
  4. 共享存储SharedMemory:共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号两,配合使用,来实现进程间的同步和通信。
    以上几种进程间通信方式中,消息队列是使用的比较频繁的方式。
  • 单工数据传输只支持数据在一个方向上传输;
  • 半双工数据传输允许数据在两个方向上传输,但是,在某一时刻,只允许数据在一个方向上传输,它实际上是一种切换方向的单工通信;
  • 全双工数据通信允许数据同时在两个方向上传输,因此,全双工通信是两个单工通信方式的结合,它要求发送设备和接收设备都有独立的接收和发送能力。

(1)管道pipe

import multiprocessing

def foo(sk):
   sk.send('hello father')
   print(sk.recv())

if __name__ == '__main__':
   conn1,conn2=multiprocessing.Pipe(duplex=True)    #开辟两个口,都是能进能出,括号中如果False即单向通信, 那么由主进程发送消息,子进程接受消息
   p=multiprocessing.Process(target=foo,args=(conn1,))  #子进程使用sock口,调用foo函数
   p.start()
   print(conn2.recv())  #主进程使用conn口接收,从管道pipe中读取消息
   conn2.send('hi son') #主进程使用conn口发送

Pipe对象返回的元组分别代表管道的两端,管道默认是全双工,两端都支持sendrecv方法,两个进程分别操作管道两端时不会有冲突,两个进程对管道一端同时读写时可能会有冲突。

如果声明了coon1,coon2 = Pipe(duplex=False)的单向管道,则coon1只负责接受消息,conn2只负责发送消息。

(2)消息队列Queue

Queue是多进程的安全队列,可以使用Queue实现多进程之间的数据传递。

Queue的一些常用方法:

  • Queue.qsize():返回当前队列包含的消息数量;
  • Queue.empty():如果队列为空,返回True,反之False ;
  • Queue.full():如果队列满了,返回True,反之False;
  • Queue.get():获取队列中的一条消息,然后将其从列队中移除,可传参超时时长。
  • Queue.get_nowait():相当Queue.get(False),取不到值时触发异常:Empty;
  • Queue.put():将一个值添加进数列,可传参超时时长。
  • Queue.put_nowait():相当于Queue.get(False),当队列满了时报错:Full。
from multiprocessing import Process, Queue
import os, time

def write(q):
    print("启动写子进程%s" % (os.getpid()))
    for chr in ["A", "B", "C", "D"]:
        q.put(chr)
        time.sleep(1)
    print("结束写子进程%s" % (os.getpid()))

def read(q):
    print("启动读子进程%s" % (os.getpid()))
    while True:
        value = q.get(True)
        print("value = " + value)
    print("结束读子进程%s" % (os.getpid()))


if __name__ == "__main__":
    #父进程创建队列,并传递给子进程
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))

    pw.start()
    pr.start()

  
    pw.join()
    #pr进程里是个死循环,无法等待其结束,只能强行结束
    pr.terminate()

    print("父进程结束")

(3)队列通信实现多进程爬虫

queue.png
import time
from multiprocessing import Process, Queue


class Downloader(Process):
    def __init__(self, task_queue, result_queue):
        super().__init__()
        self.name = '下载器'
        self.task_queue: Queue = task_queue
        self.result_queue: Queue = result_queue

    def download(self, url):
        time.sleep(2)
        print('下载完成', url)
        return {'time': time.time()}

    def run(self):  # 启动当前进程之后执行的核心程序
        while True:
            try:
                # 设置10秒的超时时间,如果是10秒没有任务,则会抛出异常
                task = self.task_queue.get(timeout=10)
                if isinstance(task, str):
                    resp = self.download(task)
                    # 数据写入到任务结果中
                    self.result_queue.put(resp)
                else:
                    self.result_queue.put('bye')
                    break
            except:
                break


class ItemPipeline(Process):
    def __init__(self, result_queue):
        super().__init__()
        self.name = '数据处理Item'
        self.result_queue = result_queue

    def run(self):
        while True:
            data = self.result_queue.get()
            if isinstance(data, dict):
                print('下载完成的数据:', data)
            else:
                break
        print('ItemPipeline 进程退出')


def start(downloads, itempipes):
    for d in downloads:
        d.start()
    for i in itempipes:
        i.start()


def wait(downloads, itempipes):
    for d in downloads:
        d.join()
    for i in itempipes:
        i.join()


if __name__ == '__main__':
    taskQueue = Queue()
    resultQueue = Queue()

    downloads = [Downloader(taskQueue, resultQueue) for _ in range(4)]
    itempipes = [ItemPipeline(resultQueue) for _ in range(2)]

    start(downloads, itempipes)

    # 发布任务
    for i in range(100):
        taskQueue.put('http://www.baidu.com/s?key=%d' % i)
        time.sleep(0.1)
    # 结束
    taskQueue.put(0)

    wait(downloads, itempipes)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354

推荐阅读更多精彩内容

  • 1、多任务的引入 实现类似唱歌又跳舞的任务同时进行的,叫做:多任务 2、多任务的概念 什么叫“多任务”。简单地说,...
    JerryChenn07阅读 161评论 0 0
  • 进程、进程的使用、进程注意点、进程间通信-Queue、进程池Pool、进程与线程对比、文件夹拷贝器-多任务 1.进...
    Cestine阅读 804评论 0 0
  • 内容大纲 1、操作系统概述1.1 OS定义及特征1.2 OS的发展 2、进程2.1 进程概念及特征2.2 进程的状...
    看看你的肥脸阅读 1,879评论 0 4
  • 进程间通信——队列和管道(multiprocess.Queue、multiprocess.Pipe) 进程间通信 ...
    go以恒阅读 1,782评论 0 3
  • 顾名思义,进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。进程的概念起源于操作系统,是操作系统最核心的概...
    SlashBoyMr_wang阅读 1,137评论 0 2