在python中Process比Thread更稳定,且Process能分布到多台机器,而Thread只能分布到同一台机器的多个CPU。
Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。
task_master.py
# coding:utf-8
import random,time,queue
from multiprocessing.managers import BaseManager
#发送任务的队列
task_queue = queue.Queue()
#接收任务的队列
result_queue = queue.Queue()
#把两个任务队列在网络上注册
BaseManager.register('get_task_queue',callable=lambda: task_queue)
BaseManager.register('get_result_queue',callable=lambda: result_queue)
#绑定端口5000,设置验证码:8e8b55261098a425273f31a
manager = BaseManager(address=('',5000),authkey=b'8e8b55261098a425273f31a')
#启动队列
manager.start()
# 获取通过网络访问的queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
begintime = time.time()
for i in range(50):
r = random.randint(10001,99999)
print("Put task %d ..." % r)
task.put(r)
for i in range(50):
r = result.get(timeout=10)
print("Result is %s" % r)
manager.shutdown()
print("master exit.")
endtime = time.time()
print('用时:%0.5f' %(endtime-begintime))
task_worker.py
#task_worker.py
#coding:utf-8
import time,sys,queue
from multiprocessing.managers import BaseManager
#获取网络中的Queue,并注册
BaseManager.register('get_task_queue')
BaseManager.register('get_result_queue')
#连接到manager.py所在机器 server_addr 为远程master服务器的ip地址
server_addr = '127.0.0.1'
print("Connecting to server %s" % server_addr)
m = BaseManager(address=(server_addr,5000),authkey=b'8e8b55261098a425273f31a')
m.connect()
#获取Queue对象
task = m.get_task_queue()
result = m.get_result_queue()
#从task中获取任务,并把结果写入result队列
for i in range(50):
try:
n = task.get(timeout=2)
print('run task %d * %d' %(n,n))
r = '%d * %d = %d ' % (n, n, n*n)
time.sleep(1)
result.put(r)
except queue.Empty:
print('Task queue is empty')
#处理结束
print('Worker exit .')