Recall there are three components of IPython parallel:
from IPython.parallel import Client
rc = Client()
rc.ids
Here we use all the engines. A DirectView
is returned.
dview = rc[:]
Then define the is_prime
function as usual.
with open('../builtin-cpuheavy/prime_list.txt') as f:
PRIMES = [int(l) for l in f]
def is_prime(n):
# import until the function is called
# make sure all engines import math
# (not a good use pattern though, more on this later)
import math
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
Use map_async
or map_sync
to map function to run in parallel
ar = dview.map_async(is_prime, PRIMES[:8])
wait_interactive()
blocks the notebook server, which provides the current task status.
Note that if one iterrupts tasks here, it only interrupts the notebook itself, the IPython cluster is still running.
ar.wait_interactive()
ar.get()
speedup = ar.serial_time / ar.wall_time
speedup
the metadata for each task's execution can be asssed by ar.metadata
ar.metadata[:1]
If any modules imported, engines should import them as well. So here use a dview.sync_import()
context_manager to help this issue. Note that import numpy as np
will not actually intepreted as np
module on engines but instead remaining numpy
.
with dview.sync_imports():
import math
import numpy as np # this won't work
def find_np():
np.random.randint(10)
rc[:2].apply_sync(find_np)
In IPython shell, %%px
ipython magic helps do some trivial parallel setup. The %%px
cell block executes its statements on all engines.
%%px --local
will executes the statments in the notebook as well.
%%px
import numpy as np
np.random.randint(6)
Try to run the following for multiple times, since engines use same processes (like a remote Python intepreter) the return value will stay the same.
%%px
import os
os.getpid()
Pushing / pulling a variable to all engines
# push
dview['prog'] = 'val_prime'
# pull
dview['prog']
# all engines get a portion of x's elements
ar = dview.scatter('x', list(range(15)))
ar.wait()
dview['x']
# get x from all engines and combined
dview.gather('x', block=True)
Here is another example
%%px
import numpy as np
rand_n = np.random.randint(0, 10, 6)
dview['rand_n']
dview.gather('rand_n', block=True)
# sum at each engine
def rand_sum():
return np.sum(rand_n)
ar = dview.apply_async(rand_sum)
ar
ar.get()
parallel sum shoud equal to serial sum
sum(ar.get()) == sum(dview.gather('rand_n', block=True))