import os,sys,time
import numpy as np
from IPython.core.display import display
from IPython import parallel
rc = parallel.Client()
dview = rc[:]
Create a LoadBalancedView
lview = rc.load_balanced_view()
lview
LoadBalancedViews behave very much like a DirectView on a single engine:
Each call to apply()
results in a single remote computation,
and the result (or AsyncResult) of that call is returned directly,
rather than in a list, as in the multi-engine DirectView.
e0 = rc[0]
from numpy.linalg import norm
A = np.random.random(1024)
e0.apply_sync(norm, A, 2)
lview.apply_sync(norm, A, 2)
However, unlike the DirectView of a single engine, you are letting the IPython Scheduler decide which engine should do the work:
e0.apply_sync(os.getpid)
for i in range(2*len(rc.ids)):
pid = lview.apply_sync(os.getpid)
print "task %i ran on: %i" % (i, pid)
The LoadBalancedView also has a load-balanced version of the builtin map()
lview.block = True
serial_result = map(lambda x:x**10, range(32))
parallel_result = lview.map(lambda x:x**10, range(32))
serial_result==parallel_result
Just like apply()
, you can use non-blocking map with block=False
or map_async
amr = lview.map_async(lambda x:x**10, range(32))
amr.msg_ids
lview.map??
amr = lview.map_async(lambda x:x**10, range(32), chunksize=4)
amr.msg_ids
AsyncResults with multiple results are actually iterable before their results arrive.
This means that you can perform map/reduce operations on elements as they come in:
lview.block = False
# scatter 'id', so id=0,1,2 on engines 0,1,2
dv = rc[:]
dv.scatter('id', rc.ids, flatten=True)
print dv['id']
# create a Reference to `id`. This will be a different value on each engine
ref = parallel.Reference('id')
tic = time.time()
ar = dv.apply(time.sleep, ref)
for i,r in enumerate(ar):
print "%i: %.3f"%(i, time.time()-tic)
amr = lview.map_async(time.sleep, [1] * 12)
amr.wait_interactive()
amr.wall_time, amr.elapsed
amr.serial_time
amr.wall_time
amr.elapsed
Now we submit a bunch of tasks of increasing magnitude, and watch where they happen, iterating through the results as they come.
def sleep_here(t):
"""sleep here for a time, return where it happened"""
import time
time.sleep(t)
return id
amr = lview.map(sleep_here, [.01*t for t in range(100)])
tic = time.time()
for i,r in enumerate(amr):
print i,r
print "task %i on engine %i: %.3f" % (i, r, time.time()-tic)
amr.wall_time
amr.serial_time
amr.serial_time / amr.wall_time
Unlike DirectView.map()
, which always results in one task per engine,
LoadBalance map defaults to one task per item in the sequence. This
can be changed by specifying the chunksize
keyword arg.
amr = lview.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
tic = time.time()
for i,r in enumerate(amr):
print "task %i on engine %i: %.3f"%(i, r, time.time()-tic)
def area(w,h):
return w*h
widths = range(1,4)
heights = range(6,10)
areas = []
for w in widths:
for h in heights:
areas.append(area(w,h))
areas
%run ../hints
nesthint()
%load ../soln/nestedloop.py
# To parallelize every call with map, you just need to get a list for each argument.
# You can use `itertools.product` + `zip` to get this:
import itertools
product = list(itertools.product(widths, heights))
# [(1, 6), (1, 7), (2, 6), (2, 7), (3, 6), (3, 7)]
# So we have a "list of pairs",
# but what we really want is a single list for each argument, i.e. a "pair of lists".
# This is exactly what the slightly weird `zip(*product)` syntax gets us:
allwidths, allheights = zip(*itertools.product(widths, heights))
print " widths", allwidths
print "heights", allheights
# Now we just map our function onto those two lists, to parallelize nested for loops:
ar = lview.map_async(area, allwidths, allheights)
Validate the result:
p_areas = ar.get()
p_areas
areas == p_areas
Now that we've seen multiplexing and load-balancing, let's see how they are used together.