from IPython.parallel import Client
rc = Client()
rc.ids
[0, 1, 2, 3]
Use all engines by DirectView
. By this view, it specifies engines to use
dview = rc[:]
Here we define the is_prime
function as usual
with open('../builtin-cpuheavy/prime_list.txt') as f:
PRIMES = [int(l) for l in f]
def is_prime(n):
import math # import until the function is called
# make sure all engines import math
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
Use map_async
or map_sync
to map function to run in parallel
ar = dview.map_async(is_prime, PRIMES[:8])
ar.wait_interactive() # wait blocks, and wait_interactive provide working status
4/4 tasks finished after 12 s done
ar.get()
[True, True, True, True, True, False, True, True]
speedup = ar.serial_time / ar.wall_time
speedup
1.7723805092389953
ar.metadata[:1]
[{'follow': [], 'engine_id': 0, 'msg_id': 'c5c2c0dc-d560-4888-9d77-2cb6a565b8cf', 'submitted': datetime.datetime(2014, 4, 21, 14, 40, 12, 468317), 'started': datetime.datetime(2014, 4, 21, 14, 40, 12, 470405), 'stderr': '', 'completed': datetime.datetime(2014, 4, 21, 14, 40, 16, 244956), 'engine_uuid': '923ba113-2829-4f9d-8b67-ffe55c69a7e7', 'pyin': None, 'outputs': [], 'received': datetime.datetime(2014, 4, 21, 14, 40, 16, 248072), 'stdout': '', 'outputs_ready': True, 'pyout': None, 'after': [], 'pyerr': None, 'data': {}, 'status': 'ok'}]
If any modules imported, engines should import them as well. So here use a dview.sync_import()
context_manager to help this issue. Note that import numpy as np
will not actually intepreted as np
module on engines but instead remaining numpy
.
with dview.sync_imports():
import math
import numpy as np
importing math on engine(s) importing numpy on engine(s)
def find_np():
np.random.randint(10)
rc[:2].apply_sync(find_np)
[0:apply]: --------------------------------------------------------------------------- NameError Traceback (most recent call last)<string> in <module>() <ipython-input-5-d9e2624d366e> in find_np() NameError: name 'np' is not defined [1:apply]: --------------------------------------------------------------------------- NameError Traceback (most recent call last)<string> in <module>() <ipython-input-5-d9e2624d366e> in find_np() NameError: name 'np' is not defined
To do so, use %%px
ipython magic. The %%px
cell block executes its statements on all engines
%%px
import numpy as np
np.random.randint(6)
Out[0:2]: 1
Out[1:2]: 2
Out[2:2]: 3
Out[3:2]: 4
%%px
# try to run it multiple times, engines use same processes (like a remote Python intepreter)
import os
os.getpid()
Out[0:2]: 6952
Out[1:2]: 6953
Out[2:2]: 6954
Out[3:2]: 6955
Pushing / pulling a variable to all engines
# push
dview['prog'] = 'val_prime'
# pull
dview['prog']
['val_prime', 'val_prime', 'val_prime', 'val_prime']
# all engines get x but with differnt value
ar = dview.scatter('x', list(range(13)))
ar.wait()
dview['x']
[[0, 1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]
# get x from all engines and combined
dview.gather('x', block=True)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
Here is another example
%%px
import numpy as np
rand_n = np.random.randint(0, 10, 6)
dview['rand_n']
[array([5, 8, 7, 7, 2, 9]), array([2, 9, 2, 2, 9, 4]), array([2, 5, 7, 7, 5, 1]), array([6, 4, 8, 5, 2, 5])]
dview.gather('rand_n', block=True)
array([5, 8, 7, 7, 2, 9, 2, 9, 2, 2, 9, 4, 2, 5, 7, 7, 5, 1, 6, 4, 8, 5, 2, 5])
# sum at each engine
def rand_sum():
return np.sum(rand_n)
ar = dview.apply_sync(rand_sum)
ar.get()
[38, 28, 27, 30]
# parallel sum shoud equal to serial sum
sum(ar.get()) == sum(dview.gather('rand_n', block=True))
True