import os,sys,time
import numpy as np
from IPython.core.display import display
from IPython import parallel
rc = parallel.Client()
The DirectView can be readily understood as an Engine Multiplexer - it does the same thing on all of its engines.
The only difference between running code on a single remote engine and running code in parallel is how many engines the DirectView is instructed to use.
You can create DirectViews by index-access to the Client. This creates
a DirectView using the engines after passing the same index (or slice)
to the ids
list.
e0 = rc[0]
engines = rc[:]
even = rc[::2]
odd = rc[1::2]
# this is the one we are going to use:
dview = engines
dview.block = True
Now, the only difference from single-engine remote execution is that the code we run happens on all of the engines of a given view:
for view in (e0, engines, even, odd):
print view, view.apply_sync(os.getpid)
The results of multiplexed execution is always a list of the length of the number of engines.
engines['a'] = 5
engines['a']
Lots of parallel computations involve partitioning data onto processes.
DirectViews have scatter()
and gather()
methods, to help with this.
Pass any container or numpy array, and IPython will partition the object onto the engines wih scatter
,
or reconstruct the full object in the Client with gather()
.
dview.scatter('a',range(16))
dview['a']
dview.gather('a')
dview.execute("asum = sum(a)")
dview.gather('asum')
We can pass a 'flatten' keyword, to instruct engines that will only get one item of the list to get the actual item, rather than a one-element sublist:
dview.scatter('id',rc.ids)
dview['id']
dview.scatter('id',rc.ids, flatten=True)
dview['id']
Scatter and gather also work with numpy arrays
A = np.random.randint(1,10,(16,4))
B = np.random.randint(1,10,(4,16))
display(A)
display(B)
dview.scatter('A', A)
dview.scatter('B', B)
display(e0['A'])
display(e0['B'])
Can you compute the Matrix product C=A.dot(B)
in parallel? (not looking for brilliant, just correct).
%run ../hints
mmhint()
%load soln/matmul.py
Let's run this, and validate the result against a local computation.
C_ref = A.dot(B)
C1 = pdot(dview, A, B)
# validation:
(C1==C_ref).all()
DirectViews have a map method, which behaves just like the builtin map, but computed in parallel.
dview.block = True
serial_result = map(lambda x:x**10, range(32))
parallel_result = dview.map(lambda x:x**10, range(32))
serial_result==parallel_result
DirectView.map
partitions the sequences onto each engine,
and then calls map
remotely. The result is always a single
IPython task per engine.
amr = dview.map_async(lambda x:x**10, range(32))
amr.msg_ids
amr = dview.map_async(lambda x:x**10, range(3200))
amr.msg_ids
from IPython.display import display, Image
%run ../images_common
pictures = get_pictures(os.path.join('..', 'images', 'castle'))
%px cd {os.getcwd()}
%%px
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from skimage.io import imread
from skimage import measure
engines.push(dict(
plot_contours=plot_contours,
find_contours=find_contours,
))
ar = e0.apply_async(get_contours_image, pictures[0])
ar.wait_interactive()
Image(data=ar.get())
amr = engines.map_async(get_contours_image, pictures[:len(engines)])
amr.wait_interactive()
for pngdata in amr:
display(Image(data=pngdata))
IPython.parallel can also be used for load-balanced execution, when you just want code to run, but don't care where.