Basic Usage

In [1]:
from IPython.parallel import Client
rc = Client()
rc.ids
Out[1]:
[0, 1, 2, 3]

Use all engines by DirectView. By this view, it specifies engines to use

In [2]:
dview = rc[:]

Here we define the is_prime function as usual

In [3]:
with open('../builtin-cpuheavy/prime_list.txt') as f:
    PRIMES = [int(l) for l in f]

def is_prime(n):
    import math  # import until the function is called
                 # make sure all engines import math
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

Run in parallel

Use map_async or map_sync to map function to run in parallel

In [4]:
ar = dview.map_async(is_prime, PRIMES[:8])
In [5]:
ar.wait_interactive() # wait blocks, and wait_interactive provide working status
   4/4 tasks finished after   12 s
done
In [6]:
ar.get()
Out[6]:
[True, True, True, True, True, False, True, True]
In [9]:
speedup = ar.serial_time / ar.wall_time
speedup
Out[9]:
1.7723805092389953
In [10]:
ar.metadata[:1]
Out[10]:
[{'follow': [],
  'engine_id': 0,
  'msg_id': 'c5c2c0dc-d560-4888-9d77-2cb6a565b8cf',
  'submitted': datetime.datetime(2014, 4, 21, 14, 40, 12, 468317),
  'started': datetime.datetime(2014, 4, 21, 14, 40, 12, 470405),
  'stderr': '',
  'completed': datetime.datetime(2014, 4, 21, 14, 40, 16, 244956),
  'engine_uuid': '923ba113-2829-4f9d-8b67-ffe55c69a7e7',
  'pyin': None,
  'outputs': [],
  'received': datetime.datetime(2014, 4, 21, 14, 40, 16, 248072),
  'stdout': '',
  'outputs_ready': True,
  'pyout': None,
  'after': [],
  'pyerr': None,
  'data': {},
  'status': 'ok'}]

Import modules remotely

If any modules imported, engines should import them as well. So here use a dview.sync_import() context_manager to help this issue. Note that import numpy as np will not actually intepreted as np module on engines but instead remaining numpy.

In [3]:
with dview.sync_imports():
    import math
    import numpy as np
importing math on engine(s)
importing numpy on engine(s)
In [5]:
def find_np():
    np.random.randint(10)

rc[:2].apply_sync(find_np)
[0:apply]: 
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)<string> in <module>()
<ipython-input-5-d9e2624d366e> in find_np()
NameError: name 'np' is not defined

[1:apply]: 
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)<string> in <module>()
<ipython-input-5-d9e2624d366e> in find_np()
NameError: name 'np' is not defined

IPython Parallel Magic

To do so, use %%px ipython magic. The %%px cell block executes its statements on all engines

In [7]:
%%px
import numpy as np
np.random.randint(6)
Out[0:2]: 1
Out[1:2]: 2
Out[2:2]: 3
Out[3:2]: 4
In [12]:
%%px
# try to run it multiple times, engines use same processes (like a remote Python intepreter)
import os
os.getpid()
Out[0:2]: 6952
Out[1:2]: 6953
Out[2:2]: 6954
Out[3:2]: 6955

Passing/Collecting Data

Pushing / pulling a variable to all engines

In [13]:
# push
dview['prog'] = 'val_prime'

# pull
dview['prog']
Out[13]:
['val_prime', 'val_prime', 'val_prime', 'val_prime']

Splitting a variable across engines

In [14]:
# all engines get x but with differnt value
ar = dview.scatter('x', list(range(13)))
ar.wait()
In [15]:
dview['x']
Out[15]:
[[0, 1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]
In [16]:
# get x from all engines and combined
dview.gather('x', block=True)
Out[16]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

Here is another example

In [17]:
%%px 
import numpy as np
rand_n = np.random.randint(0, 10, 6)
In [19]:
dview['rand_n']
Out[19]:
[array([5, 8, 7, 7, 2, 9]),
 array([2, 9, 2, 2, 9, 4]),
 array([2, 5, 7, 7, 5, 1]),
 array([6, 4, 8, 5, 2, 5])]
In [20]:
dview.gather('rand_n', block=True)
Out[20]:
array([5, 8, 7, 7, 2, 9, 2, 9, 2, 2, 9, 4, 2, 5, 7, 7, 5, 1, 6, 4, 8, 5, 2,
       5])
In [21]:
# sum at each engine
def rand_sum():
    return np.sum(rand_n)

ar = dview.apply_sync(rand_sum)
In [22]:
ar.get()
Out[22]:
[38, 28, 27, 30]
In [25]:
# parallel sum shoud equal to serial sum
sum(ar.get()) == sum(dview.gather('rand_n', block=True))
Out[25]:
True