在上一篇中我们介绍了 mpi4py 中标准阻塞通信模式,下面我们将介绍缓冲阻塞通信模式。
缓冲通信模式主要用于解开阻塞通信的发送和接收之间的耦合。有了缓冲机制,即使在接受端没有启动相应的接收的情况下,在完成其消息数据到缓冲区的转移后发送端的阻塞发送函数也可返回。其实标准通信模式中也存在缓冲机制,它使用的是 MPI 环境所提供的数据缓冲区,是有一定大小的。使用缓冲通信模式,我们可以自己分配和组装一块内存区域用作缓冲区,缓冲区的大小可以根据需要进行控制。但需要注意的是,当消息大小超过缓冲区容量时,程序会出错。
下面是 mpi4py 中用于缓冲阻塞点到点通信的方法接口(MPI.Comm 类的方法):
bsend(self, obj, int dest, int tag=0)
recv(self, buf=None, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
Bsend(self, buf, int dest, int tag=0)
Recv(self, buf, int source=ANY_SOURCE, int tag=ANY_TAG, Status status=None)
这些方法调用中的参数是与标准通信模式的方法调用参数一样的。
另外我们会用到的装配和卸载用于通信的缓冲区的函数如下:
MPI.Attach_buffer(buf)
MPI.Detach_buffer()
下面分别给出 bsend/recv 和 Bsend/Recv 的使用例程。
# bsend_recv.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# MPI.BSEND_OVERHEAD gives the extra overhead in buffered mode
BUFSISE = 2000 + MPI.BSEND_OVERHEAD
buf = bytearray(BUFSISE)
# Attach a user-provided buffer for sending in buffered mode
MPI.Attach_buffer(buf)
send_obj = {'a': [1, 2.4, 'abc', -2.3+3.4J],
'b': {2, 3, 4}}
if rank == 0:
comm.bsend(send_obj, dest=1, tag=11)
recv_obj = comm.recv(source=1, tag=22)
elif rank == 1:
recv_obj = comm.recv(source=0, tag=11)
comm.bsend(send_obj, dest=0, tag=22)
print 'process %d receives %s' % (rank, recv_obj)
# Remove an existing attached buffer
MPI.Detach_buffer()
运行结果如下:
$ mpiexec -n 2 python bsend_recv.py
process 0 receives {'a': [1, 2.4, 'abc', (-2.3+3.4j)], 'b': set([2, 3, 4])}
process 1 receives {'a': [1, 2.4, 'abc', (-2.3+3.4j)], 'b': set([2, 3, 4])}
# Bsend_recv.py
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# MPI.BSEND_OVERHEAD gives the extra overhead in buffered mode
BUFSISE = 2000 + MPI.BSEND_OVERHEAD
buf = bytearray(BUFSISE)
# Attach a user-provided buffer for sending in buffered mode
MPI.Attach_buffer(buf)
count = 10
send_buf = np.arange(count, dtype='i')
recv_buf = np.empty(count, dtype='i')
if rank == 0:
comm.Bsend(send_buf, dest=1, tag=11)
comm.Recv(recv_buf, source=1, tag=22)
elif rank == 1:
comm.Recv(recv_buf, source=0, tag=11)
comm.Bsend(send_buf, dest=0, tag=22)
print 'process %d receives %s' % (rank, recv_buf)
# Remove an existing attached buffer
MPI.Detach_buffer()
运行结果如下:
$ mpiexec -n 2 python Bsend_recv.py
process 0 receives [0 1 2 3 4 5 6 7 8 9]
process 1 receives [0 1 2 3 4 5 6 7 8 9]
在以上两个例程中,因为发送的数据量很小,即使不装配一个用于通信的缓冲区,程序一样可以工作(读者可以试一试),这时将使用 MPI 环境提供的缓冲区。但是当通信的数据量很大超过 MPI 环境提供的缓冲区容量时,就必须提供一个足够大的缓冲区以使程序能够正常工作。
可以用下面这个例程测试一下 MPI 环境提供的缓冲区大小。
# attach_detach_buf.py
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
max_msg_size = 2**10
BUFSISE = 32 * max_msg_size
mpi_buf = bytearray(BUFSISE)
# Attach a big user-provided buffer for sending in buffered mode
MPI.Attach_buffer(mpi_buf)
recv_buf = np.empty((max_msg_size,), np.float64)
if rank == 0:
print '-' * 80
print 'With an attached big buffer:'
print
msg_size = 1
tag = 0
while msg_size <= max_msg_size:
msg = np.random.random((msg_size,))
if rank == 0:
print 'Trying with size: ', msg_size
comm.Bsend(msg, (rank+1)%2, tag)
comm.Recv(recv_buf, (rank+1)%2, tag)
if rank == 0:
print 'Completed with size: ', msg_size
msg_size *= 2
tag += 1
# Remove an existing attached buffer
MPI.Detach_buffer()
if rank == 0:
print
print '-' * 80
print 'Without an attached big buffer:'
print
msg_size = 1
tag = 0
while msg_size <= max_msg_size:
msg = np.random.random((msg_size,))
if rank == 0:
print 'Trying with size: ', msg_size
comm.Bsend(msg, (rank+1)%2, tag)
comm.Recv(recv_buf, (rank+1)%2, tag)
if rank == 0:
print 'Completed with size: ', msg_size
msg_size *= 2
tag += 1
运行结果如下:
$ mpiexec -n 2 python attach_detach_buf.py
--------------------------------------------------------------------------------
With an attached big buffer:
Trying with size: 1
Completed with size: 1
Trying with size: 2
Completed with size: 2
Trying with size: 4
Completed with size: 4
Trying with size: 8
Completed with size: 8
Trying with size: 16
Completed with size: 16
Trying with size: 32
Completed with size: 32
Trying with size: 64
Completed with size: 64
Trying with size: 128
Completed with size: 128
Trying with size: 256
Completed with size: 256
Trying with size: 512
Completed with size: 512
Trying with size: 1024
Completed with size: 1024
--------------------------------------------------------------------------------
Without an attached big buffer:
Trying with size: 1
Completed with size: 1
Trying with size: 2
Completed with size: 2
Trying with size: 4
Completed with size: 4
Trying with size: 8
Traceback (most recent call last):
Completed with size: 8
Trying with size: 16
Completed with size: 16
Trying with size: 32
Completed with size: 32
Trying with size: 64
Completed with size: 64
Trying with size: 128
Completed with size: 128
Trying with size: 256
Completed with size: 256
Trying with size: 512
Traceback (most recent call last):
File "attach_detach_buf.py", line 56, in <module>
File "attach_detach_buf.py", line 56, in <module>
comm.Bsend(msg, (rank+1)%2, tag)
File "Comm.pyx", line 286, in mpi4py.MPI.Comm.Bsend (src/mpi4py.MPI.c:64922)
comm.Bsend(msg, (rank+1)%2, tag)
mpi4py.MPI.Exception: MPI_ERR_BUFFER: invalid buffer pointer
File "Comm.pyx", line 286, in mpi4py.MPI.Comm.Bsend (src/mpi4py.MPI.c:64922)
mpi4py.MPI.Exception: MPI_ERR_BUFFER: invalid buffer pointer
-------------------------------------------------------
Primary job terminated normally, but 1 process returned
a non-zero exit code.. Per user-direction, the job has been aborted.
-------------------------------------------------------
--------------------------------------------------------------------------
mpiexec detected that one or more processes exited with non-zero status, thus causing
the job to be terminated. The first process to do so was:
Process name: [[45613,1],0]
Exit code: 1
--------------------------------------------------------------------------
可以看出,当我们提供一个大的缓冲区时就能够成功地收发大的消息,但是当我们卸载掉这个缓冲区后,再发送大的消息时就出错了。
上面我们介绍 mpi4py 中缓冲阻塞通信模式,在下一篇中我们将介绍就绪阻塞通信模式。