Python有很多库可以支持并行计算。
>>> import threading
>>> def thread_hello():
other = threading.Thread(target=thread_say_hello, args=())
other.start()
thread_say_hello()
>>> def thread_say_hello():
print('hello from', threading.current_thread().name)
>>> thread_hello()
hello from Thread-1
hello from MainThread
>>> import multiprocessing
>>> def process_hello():
other = multiprocessing.Process(target=process_say_hello, args=())
other.start()
process_say_hello()
>>> def process_say_hello():
print('hello from', multiprocessing.current_process().name)
>>> process_hello()
hello from MainProcess
hello from Process-1
threading
和multiprocessing
库有着类似的API,但是前者只是建立单个线程,后者对多进程封装得更完善,对多核CPU的支持更好。更多可阅读Python标准库08 多线程与同步 (threading包), Python标准库10 多进程初步 (multiprocessing包), Python多进程并发(multiprocessing)
threading
模块使用线程,multiprocessing
使用进程。其区别不同在于,线程使用同一内存空间,而进程分配有不同的内存空间。因此进程间难以共享对象。但两个线程则有可能同时改写同一内存空间。为防止出现冲突,可以使用GIL保证不会同时执行可能冲突的线程。
更多对比
下面是一个线程冲突的实例
import threading
from time import sleep
counter = [0]
def increment():
count = counter[0]
sleep(0) # try to force a switch to the other thread
counter[0] = count + 1
other = threading.Thread(target=increment, args=())
other.start()
increment()
print('count is now: ', counter[0])
下面是执行过程:
Thread 0 Thread 1
read counter[0]: 0
read counter[0]: 0
calculate 0 + 1: 1
write 1 -> counter[0]
calculate 0 + 1: 1
write 1 -> counter[0]
问题在于:尽管执行了两次加法,但结果仍然是:1。
在Python中,最简单的保证数据同步的方法是使用queue
模块的Queue
类。
from queue import Queue
queue = Queue()
def synchronized_consume():
while True:
print('got an item:', queue.get()) # 得到对象
queue.task_done() # 队列任务结束
def synchronized_produce():
consumer = threading.Thread(target=synchronized_consume, args=())
consumer.daemon = True
consumer.start()
for i in range(10):
queue.put(i) # 加入新对象
queue.join() # 确保所有队列任务结束后,退出
synchronized_produce()
如果上面这个办法因为某些原因做不到,那我们可以使用threading
模块中的Lock
类。
seen = set()
seen_lock = threading.Lock()
def already_seen(item):
seen_lock.acquire() # 在Lock类的
result = True # acquire方法
if item not in seen: # 和release方法
seen.add(item) # 之间的代码
result = False # 仅能同时被
seen_lock.release() # 一个线程访问
return result
def already_seen(item):
with seen_lock:
if item not in seen:
seen.add(item)
return False
return True
还有一个办法是threading
模块中的Barrier
类。
counters = [0, 0]
barrier = threading.Barrier(2)
def count(thread_num, steps):
for i in range(steps):
other = counters[1 - thread_num]
barrier.wait() # wait for reads to complete
counters[thread_num] = other + 1
barrier.wait() # wait for writes to complete
def threaded_count(steps):
other = threading.Thread(target=count, args=(1, steps))
other.start()
count(0, steps)
print('counters:', counters)
threaded_count(10)
更多参考Python的多线程编程模块 threading 参考,17.1. threading — Thread-based parallelism。
防止共享数据错误读写的终极机制是完全避免并发地接触同一数据。进程的内存空间的独立性完全符合这一要求。为了解决进程之间的交流问题,multiprocessing
模块特别提供了Pipe
类。Pipe
默认为两条通道,如果传入参数False
则为一条通道。
def process_consume(in_pipe):
while True:
item = in_pipe.recv() # 只有接收成功后才会继续执行
if item is None:
return
print('got an item:', item)
def process_produce():
pipe = multiprocessing.Pipe(False)
consumer = multiprocessing.Process(target=process_consume, args=(pipe[0],))
consumer.start()
for i in range(10):
pipe[1].send(i) # 通过通道发送对象
pipe[1].send(None) # done signal
process_produce()
在执行并发计算时,程序员往往会犯下错误:
- 同步不足(Under-synchronization):一些线程没有被同步
- 过度同步(Over-synchronization):某些本可以并发执行的线程,被串行化
- 死锁(Deadlock):被同步的进程相互等候对方完成某些步骤才进行下一步,导致程序锁死。一个栗子:
def deadlock(in_pipe, out_pipe):
item = in_pipe.recv()
print('got an item:', item)
out_pipe.send(item + 1)
def create_deadlock():
pipe = multiprocessing.Pipe()
other = multiprocessing.Process(target=deadlock, args=(pipe[0], pipe[1]))
other.start()
deadlock(pipe[1], pipe[0])
create_deadlock()