Just like we did manually with memmap, you can move data more efficiently with MPI by sending it to just one engine, and using MPI to broadcast it to the rest of the engines.
import socket
import os, sys, re
import numpy as np
from IPython import parallel
For this demo, I will connect to a cluster with engines started with MPI. If you have MPI and mpi4py on your machine, you can start a local cluster with MPI with:
ipcluster start -n 8 --engines=MPI --profile mpi
mpi_profile = 'dirac'
rc = parallel.Client(profile=mpi_profile)
eall = rc[:]
root = rc[-1]
%px from mpi4py.MPI import COMM_WORLD as MPI
mpi_ranks = eall.apply_async(lambda : MPI.Get_rank()).get_dict()
root_rank = root.apply_sync(lambda : MPI.Get_rank())
mpi_ranks
sz = 256
data = np.random.random((sz, sz))
data = data.dot(data.T)
%%time
ar = eall.push({'data': data}, block=False)
ar.wait_interactive()
@parallel.interactive
def _bcast(key, root_rank):
"""function to run on engines as part of broadcast"""
g = globals()
obj = g.get(key, None)
obj = MPI.bcast(obj, root_rank)
g[key] = obj
def broadcast(key, obj, dv, root, root_rank):
"""More efficient broadcast by doing push to root,
and MPI broadcast to other engines.
Still O(N) messages, but all but one message is always small.
"""
root.push({key : obj}, block=False)
return dv.apply_async(_bcast, key, root_rank)
%%time
ar = broadcast('data', data, eall, root, root_rank)
ar.wait_interactive()
eall.apply_sync(np.linalg.norm, parallel.Reference('data'), 2)