In order to use BroadcastView efficiently, tasks must be able to be expressed as a single apply
(or execute
) call.
This is best done through SPMD-style tasks.
If you've written code for MPI, this pattern ought to be familiar.
The main thing that differs SPMD tasks is that each engine gets the same code to execute, but the results of what the execute depend on the state of the process. Typically the engine's rank in the cluster and the cluster size, though it can get more complex.
So rather than calling map(func, inputs)
, you call apply(map_func)
, where map_func
computes a partition and calls the map.
Computing the partition becomes part of the task itself.
So the first thing we need is a function to compute the partition, given inputs, rank, and size. Here is a very simple example partitioning function for one-dimensional sequences (e.g. lists)
def get_partition(n_items: int, rank: int, size: int)-> tuple[int, int]:
"""
Compute the partition
Returns (start, end) of partition
"""
chunk_size = n_items // size
if n_items % size:
chunk_size += 1
start = rank * chunk_size
if rank + 1 == size:
end = n_items
else:
end = start + chunk_size
return (start, end)
n = 10
for size in (3, 4, 5):
for rank in range(size):
print(size, rank, get_partition(n, rank, size))
3 0 (0, 4) 3 1 (4, 8) 3 2 (8, 10) 4 0 (0, 3) 4 1 (3, 6) 4 2 (6, 9) 4 3 (9, 10) 5 0 (0, 2) 5 1 (2, 4) 5 2 (4, 6) 5 3 (6, 8) 5 4 (8, 10)
Now set up our fake workload.
It is a bunch of random files. For our purposes, 5 files per engine.
import tempfile
from pathlib import Path
tmp_dir = tempfile.TemporaryDirectory()
tmp_path = Path(tmp_dir.name)
n_engines = 100
tasks_per_engine = 5
n_items = n_engines * tasks_per_engine
for i in range(n_items):
with (tmp_path / f"file-{i:03}.txt").open("wb") as f:
f.write(os.urandom(1024))
input_files = list(tmp_path.glob("*.txt"))
len(input_files)
500
Here's our task: compute the md5sum of the contents of one file
from hashlib import md5
def compute_one(fname):
hasher = md5()
with open(fname, "rb") as f:
hasher.update(f.read())
return hasher.hexdigest()
compute_one(input_files[0])
'73dd81c88fe62de4e7ab710aa04861a9'
%time local_result = list(map(compute_one, input_files))
CPU times: user 4.62 ms, sys: 7.45 ms, total: 12.1 ms Wall time: 11.6 ms
Now we need to define a task that takes as input:
which will compute the same thing as computing a chunk of map(compute_one, input_files)
def spmd_task(tmp_path, rank, size):
# identify all inputs
all_input_files = list(Path(tmp_path).glob("*.txt"))
# partition inputs
n_items = len(all_input_files)
start, end = get_partition(n_items, rank, size)
my_input_files = all_input_files[start:end]
# compute result
return list(map(compute_one, my_input_files))
for rank in range(5):
print(spmd_task(tmp_path, rank, n_items // 2))
['73dd81c88fe62de4e7ab710aa04861a9', '162bcd77dff3ff2fefebdb951d6b87cf'] ['588950f656a840423c936281eca794b5', '2fc1db292ee57bc003f5abf311a487f8'] ['4a0e249263a6695ec780881634dccfd7', '3f1594cfb346502818c98bd22e14f783'] ['54c61fc576a7e8ca7d0af4d14257d7a5', '1e8a9deda60a1a7218469fb67ba041fd'] ['6b4a782e39c61ee9125dbf831718fea8', '3ff2c6e4c276cc8f9927e5a457d4872a']
local_result[:10]
['73dd81c88fe62de4e7ab710aa04861a9', '162bcd77dff3ff2fefebdb951d6b87cf', '588950f656a840423c936281eca794b5', '2fc1db292ee57bc003f5abf311a487f8', '4a0e249263a6695ec780881634dccfd7', '3f1594cfb346502818c98bd22e14f783', '54c61fc576a7e8ca7d0af4d14257d7a5', '1e8a9deda60a1a7218469fb67ba041fd', '6b4a782e39c61ee9125dbf831718fea8', '3ff2c6e4c276cc8f9927e5a457d4872a']
Now it's time to do it in parallel
import logging
import ipyparallel as ipp
try:
# stop previous cluster, if we're re-running cells
cluster.stop_cluster_sync()
except NameError:
pass
cluster = ipp.Cluster(n=n_engines, log_level=logging.WARNING)
rc = cluster.start_and_connect_sync()
67%|######7 | 67/100 [00:00<?, ?engine/s]
broadcast_view = rc.broadcast_view()
Distribute rank and size. This is unnecessary if engines are created with MPI.
broadcast_view.scatter("rank", rc.ids, flatten=True)
broadcast_view["size"] = size = len(rc)
enable cloudpickle, which handles imports; we could also explicitly push everything we are going to use.
broadcast_view.use_cloudpickle().get();
We can now send this SPMD task as a single task on all engines, each of which will compute its own partition as part of the task and do its work:
ar = broadcast_view.apply(spmd_task, tmp_path, ipp.Reference("rank"), size)
Finally, we can reconstruct the result:
Because we called apply
, the result is a list of lists, when we want a single flat sequence.
itertools.chain
takes care of that!
ar.get()[:2]
[['73dd81c88fe62de4e7ab710aa04861a9', '162bcd77dff3ff2fefebdb951d6b87cf', '588950f656a840423c936281eca794b5', '2fc1db292ee57bc003f5abf311a487f8', '4a0e249263a6695ec780881634dccfd7'], ['3f1594cfb346502818c98bd22e14f783', '54c61fc576a7e8ca7d0af4d14257d7a5', '1e8a9deda60a1a7218469fb67ba041fd', '6b4a782e39c61ee9125dbf831718fea8', '3ff2c6e4c276cc8f9927e5a457d4872a']]
from itertools import chain
parallel_result = list(chain(*ar))
parallel_result[:6], local_result[:6]
(['73dd81c88fe62de4e7ab710aa04861a9', '162bcd77dff3ff2fefebdb951d6b87cf', '588950f656a840423c936281eca794b5', '2fc1db292ee57bc003f5abf311a487f8', '4a0e249263a6695ec780881634dccfd7', '3f1594cfb346502818c98bd22e14f783'], ['73dd81c88fe62de4e7ab710aa04861a9', '162bcd77dff3ff2fefebdb951d6b87cf', '588950f656a840423c936281eca794b5', '2fc1db292ee57bc003f5abf311a487f8', '4a0e249263a6695ec780881634dccfd7', '3f1594cfb346502818c98bd22e14f783'])
assert parallel_result == local_result
cluster.stop_cluster_sync()