BroadcastView does not natively support map, because map fundamentally means each engine doing something different, while BroadcastView is all about optimizing all engines doing the same thing.
There are ways to use BroadcastView to run maps, however.
A pap consists of 3 steps:
scatter
) input sequences into chunksresult_chunk = list(map(func, chunk))
everywheregather
) result_chunks
into a flat array/list/iterableIt's only the partitioning in step 1 that BroadcastView can't do. So if we do the partitioning ahead of time, BroadcastView can actually be used to schedule a map.
First, create a cluster and two views:
import ipyparallel as ipp
cluster = ipp.Cluster(n=4)
rc = cluster.start_and_connect_sync()
Starting 4 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
0%| | 0/4 [00:00<?, ?engine/s]
dview = rc[:]
bview = rc.broadcast_view()
Next, use the DirectView to scatter
our input sequence,
so the chunks are available as a
on each engine
dview.scatter('a', list(range(10)))
<AsyncResult(scatter): pending>
bview['a']
[[0, 1, 2], [3, 4, 5], [6, 7], [8, 9]]
IPython Parallel has a tool for passing arguments to a function that should resolve in the engine's namespace: ipp.Reference
References are resolved to the object of the same name. So we can now use BroadcastView to execute the same code, but which will get the distributed chunk on each engine as input:
bview.apply(str, ipp.Reference('a')).get_dict()
{0: '[0, 1, 2]', 1: '[3, 4, 5]', 2: '[6, 7]', 3: '[8, 9]'}
So we can compute the chunks of a map with a broadcast apply()
on a Reference to a
def map_squares(chunk):
"""compute square of every item in chunk as a list"""
return [n * n for n in chunk]
ar = bview.apply(map_squares, ipp.Reference("a"))
ar.get()
[[0, 1, 4], [9, 16, 25], [36, 49], [64, 81]]
Now the only step is flattening the sequences. One simple version of this is to use itertools.chain
:
import itertools
for item in itertools.chain(*ar):
print(item)
0 1 4 9 16 25 36 49 64 81
Or to get the full, identical behavior of DirectView.map, we can reconstruct the AsyncResult into an AsyncMapResult:
amr = ipp.AsyncMapResult(bview.client, ar._children, ipp.client.map.Map())
for item in amr:
print(item)
0 1 4 9 16 25 36 49 64 81
We can tie it all together in an actual broadcast_map
function that has the same behavior and signature as DirectView.map
def _list_map(*args):
"""evaluate map to a list
rather than returning the lazy object,
which isn't portable
This is the function we pass to `BroadcastView.apply` every time
"""
return list(map(*args))
def broadcast_map(bview, f, *sequences):
"""Run a map on a BroadcastView"""
seq_names = []
refs = []
# scatter sequences using a DirectView
dview = bview.client.direct_view(bview.targets)
scatters = []
for i, seq in enumerate(sequences):
seq_name = f"_seq_{i}"
seq_names.append(seq_name)
refs.append(ipp.Reference(seq_name))
scatters.append(dview.scatter(seq_name, seq))
# need to wait for scatters to complete before we submit map,
# since they happen on a different channel and could race
[scatter.get() for scatter in scatters]
ar = bview.apply(_list_map, f, *refs)
# re-wrap messages in an AsyncMapResult to get map API
# could also skip this and use itertools.chain
amr = ipp.AsyncMapResult(bview.client, ar._children, ipp.client.map.Map())
# register cleanup after execution, since we used the global namespace
@ipp.remote
def cleanup(names):
g = globals()
for name in names:
g.pop(name)
bview.apply(cleanup, refs)
return amr
for item in broadcast_map(bview, lambda x: x**3, range(10)):
print(item)
0 1 8 27 64 125 216 343 512 729
Since BroadcastView is meant to enable SPMD-style tasks where every engine gets the same code, and each one performs a different task based on its state or available data, another, even more efficent way to do this kind of workload is to compute the partitions entirely on the engines, e.g. using MPI-stype rank and size.
# scatter simple rank & size
# in practice, these likely come from MPI, etc. and don't need any setup.
dview = rc.direct_view()
size = len(rc)
ranks = list(range(size))
ranks
dview.scatter("rank", ranks, flatten=True)
dview["size"] = size
dview["rank"]
[0, 1, 2, 3]
In this example, computing the chunk is part of the task itself.
def spmd_task():
# compute chunks as part of the task
# using rank & size
input_size = 10
chunk_size = input_size // size
start = chunk_size * rank
if rank + 1 == size:
end = input_size
else:
end = start + chunk_size
chunk = range(start, end)
return [n * n for n in chunk]
ar = bview.apply(spmd_task)
ar.get()
[[0, 1], [4, 9], [16, 25], [36, 49, 64, 81]]
And again, we can flatten results with itertools.chain
:
for item in itertools.chain(*ar.get()):
print(item)
0 1 4 9 16 25 36 49 64 81
cluster.stop_cluster_sync()
Stopping controller Controller stopped: {'exit_code': 0, 'pid': 83730, 'identifier': 'ipcontroller-1707138048-qg3h-83722'} Stopping engine(s): 1707138049 engine set stopped 1707138049: {'engines': {'0': {'exit_code': 0, 'pid': 83742, 'identifier': '0'}, '1': {'exit_code': 0, 'pid': 83743, 'identifier': '1'}, '2': {'exit_code': 0, 'pid': 83744, 'identifier': '2'}, '3': {'exit_code': 0, 'pid': 83745, 'identifier': '3'}}, 'exit_code': 0}