#!/usr/bin/env python # coding: utf-8 # ## Demosntrating map with ipython parallel BroadcastView # # BroadcastView does not natively support map, because map fundamentally means each engine doing something different, while BroadcastView is all about optimizing all engines doing the _same_ thing. # # There are ways to use BroadcastView to run maps, however. # # A pap consists of 3 steps: # # 1. partition (`scatter`) input sequences into chunks # 2. run `result_chunk = list(map(func, chunk))` everywhere # 3. reconstruct (`gather`) `result_chunks` into a flat array/list/iterable # # It's only the partitioning in step 1 that BroadcastView can't do. So if we do the partitioning ahead of time, BroadcastView can actually be used to schedule a map. # # First, create a cluster and two views: # In[1]: import ipyparallel as ipp cluster = ipp.Cluster(n=4) rc = cluster.start_and_connect_sync() # In[2]: dview = rc[:] bview = rc.broadcast_view() # Next, use the DirectView to `scatter` our input sequence, # so the chunks are available as `a` on each engine # In[3]: dview.scatter('a', list(range(10))) # In[4]: bview['a'] # IPython Parallel has a tool for passing arguments to a function that should resolve in the engine's namespace: `ipp.Reference` # # References are resolved to the object of the same name. # So we can now use BroadcastView to execute the same _code_, but which will get the distributed chunk on each engine as input: # # In[5]: bview.apply(str, ipp.Reference('a')).get_dict() # So we can compute the chunks of a map with a broadcast `apply()` on a _Reference_ to `a` # In[6]: def map_squares(chunk): """compute square of every item in chunk as a list""" return [n * n for n in chunk] ar = bview.apply(map_squares, ipp.Reference("a")) ar.get() # Now the only step is flattening the sequences. One simple version of this is to use `itertools.chain`: # In[7]: import itertools for item in itertools.chain(*ar): print(item) # Or to get the full, identical behavior of DirectView.map, we can reconstruct the AsyncResult into an AsyncMapResult: # In[8]: amr = ipp.AsyncMapResult(bview.client, ar._children, ipp.client.map.Map()) for item in amr: print(item) # We can tie it all together in an actual `broadcast_map` function that has the same behavior and signature as `DirectView.map` # In[9]: def _list_map(*args): """evaluate map to a list rather than returning the lazy object, which isn't portable This is the function we pass to `BroadcastView.apply` every time """ return list(map(*args)) def broadcast_map(bview, f, *sequences): """Run a map on a BroadcastView""" seq_names = [] refs = [] # scatter sequences using a DirectView dview = bview.client.direct_view(bview.targets) scatters = [] for i, seq in enumerate(sequences): seq_name = f"_seq_{i}" seq_names.append(seq_name) refs.append(ipp.Reference(seq_name)) scatters.append(dview.scatter(seq_name, seq)) # need to wait for scatters to complete before we submit map, # since they happen on a different channel and could race [scatter.get() for scatter in scatters] ar = bview.apply(_list_map, f, *refs) # re-wrap messages in an AsyncMapResult to get map API # could also skip this and use itertools.chain amr = ipp.AsyncMapResult(bview.client, ar._children, ipp.client.map.Map()) # register cleanup after execution, since we used the global namespace @ipp.remote def cleanup(names): g = globals() for name in names: g.pop(name) bview.apply(cleanup, refs) return amr for item in broadcast_map(bview, lambda x: x**3, range(10)): print(item) # ## SPMD-style map # # Since BroadcastView is meant to enable SPMD-style tasks where every engine gets the same _code_, and each one performs a different task based on its _state_ or available data, another, even more efficent way to do this kind of workload is to compute the partitions entirely on the engines, e.g. using MPI-stype rank and size. # In[10]: # scatter simple rank & size # in practice, these likely come from MPI, etc. and don't need any setup. dview = rc.direct_view() size = len(rc) ranks = list(range(size)) ranks dview.scatter("rank", ranks, flatten=True) dview["size"] = size dview["rank"] # In this example, computing the chunk is part of the task itself. # In[11]: def spmd_task(): # compute chunks as part of the task # using rank & size input_size = 10 chunk_size = input_size // size start = chunk_size * rank if rank + 1 == size: end = input_size else: end = start + chunk_size chunk = range(start, end) return [n * n for n in chunk] ar = bview.apply(spmd_task) ar.get() # And again, we can flatten results with `itertools.chain`: # In[12]: for item in itertools.chain(*ar.get()): print(item) # In[13]: cluster.stop_cluster_sync()