import ipyparallel as ipp
# create a cluster
cluster = ipp.Cluster(engines="mpi", n=2)
# start that cluster and connect to it
rc = cluster.start_and_connect_sync()
Starting 2 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
0%| | 0/2 [00:00<?, ?engine/s]
What did that do?
mpiexec -n 2 python -m ipyparallel.engine --mpi
cluster.engine_set.args
['mpiexec', '-n', '2', '/Users/minrk/conda/bin/python', '-m', 'ipyparallel.engine', '--mpi']
If we 'activate' the client,
it registers magics with IPython, so we can use %%px
to run cells on the engines
instead of in the local notebook.
rc.activate()
<DirectView all>
%%px
import os
os.getpid()
Out[0:1]: 45610
Out[1:1]: 45611
From here, we are going to adapt this MPI tutorial,
using %%px
and mpi4py.
%%px
# Find out rank, size
from mpi4py import MPI
comm = MPI.COMM_WORLD
world_rank = comm.Get_rank()
world_size = comm.Get_size()
print(f"I am rank {world_rank} / {world_size}")
[stdout:0] I am rank 0 / 2
[stdout:1] I am rank 1 / 2
%%px
number = None
if world_rank == 0:
number = -1
comm.send(number, dest=1)
elif world_rank == 1:
number = comm.recv(source=0)
print(f"Process 1 received number {number} from process 0")
number
[stdout:1] Process 1 received number -1 from process 0
Out[0:3]: -1
Out[1:3]: -1
%px world_rank
Out[0:4]: 0
Out[1:4]: 1
%%px --group-outputs order
import time
PING_PONG_LIMIT = 5
ping_pong_count = 0
partner_rank = (world_rank + 1) % 2
while ping_pong_count < PING_PONG_LIMIT:
if world_rank == ping_pong_count % 2:
ping_pong_count += 1
comm.send(ping_pong_count, dest=partner_rank)
print(f"{world_rank} sent and incremented ping_pong_count {ping_pong_count} to {partner_rank}")
else:
ping_pong_count = comm.recv(source=partner_rank)
print(f"{world_rank} received ping_pong_count {ping_pong_count} from {partner_rank}")
# Make sure the output is synchronized
comm.Barrier()
[stdout:0] 0 sent and incremented ping_pong_count 1 to 1 0 received ping_pong_count 2 from 1 0 sent and incremented ping_pong_count 3 to 1 0 received ping_pong_count 4 from 1 0 sent and incremented ping_pong_count 5 to 1
[stdout:1] 1 received ping_pong_count 1 from 0 1 sent and incremented ping_pong_count 2 to 0 1 received ping_pong_count 3 from 0 1 sent and incremented ping_pong_count 4 to 0 1 received ping_pong_count 5 from 0
cluster.stop_cluster_sync()
Stopping controller Controller stopped: {'exit_code': 0, 'pid': 45594, 'identifier': 'ipcontroller-1679059040-uicm-45588'} Stopping engine(s): 1679059042 engine set stopped 1679059042: {'exit_code': 0, 'pid': 45608, 'identifier': 'ipengine-1679059040-uicm-1679059042-45588'}
This one uses more than 2 processes
import ipyparallel as ipp
# create a cluster
cluster = ipp.Cluster(engines="mpi", n=4)
# start that cluster and connect to it
rc = cluster.start_and_connect_sync()
# activate the new cluster to handle `%%px`
rc.activate()
Starting 4 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
0%| | 0/4 [00:00<?, ?engine/s]
<DirectView all>
%%px
from mpi4py import MPI
comm = MPI.COMM_WORLD
world_rank = comm.Get_rank()
world_size = comm.Get_size()
if world_rank != 0:
token = comm.recv(source=world_rank - 1)
print(f"{world_rank} received {token} from {world_rank-1}")
token = token + f"{world_rank}".encode('ascii')
else:
token = b"0"
comm.send(token, dest=(world_rank + 1) % world_size)
if world_rank == 0:
token = comm.recv(source=world_size - 1)
print(f"{world_rank} received {token} from {world_size-1}")
[stdout:1] 1 received b'0' from 0
[stdout:2] 2 received b'01' from 1
[stdout:0] 0 received b'0123' from 3
[stdout:3] 3 received b'012' from 2