git clone git://github.com/jseabold/zorro.git zorro-talk
svn checkout https://github.com/jseabold/zorro zorro-talk
cd zorro-talk
ipython notebook --notebook-dir=.
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import Image
Image(filename="./parallel_architecture400.png")
Three Core Parts
Client
object connects to the clusterView
DirectView
class for explicitly running code on a particular engine(s)LoadBalancedView
class for running your code on the 'best' engine(s)from multiprocessing import cpu_count
print cpu_count()
Start a controller and 4 engines with the ipcluster program
At the command line type
ipcluster start -n 4
from IPython import parallel
rc = parallel.Client(profile='hpc')
rc.block = True
rc.ids
def power(a, b):
return a**b
direct view
of kernel 0dv = rc[0]
dv
dv.apply(power, 2, 10)
Recall that slice notation allows you leave out start and stop steps
X = [1, 2, 3, 4]
X
X[:]
Use this to send code to all the engines
rc[:].apply_sync(power, 2, 10)
Python's built-in map function allows you to call a sequence a function over a sequences of arguments
map(power, [2]*10, range(10))
In parallel, you use view.map
view = rc.load_balanced_view()
view.map(power, [2]*10, range(10))
"Premature optimization is the root of all evil".
-Donald Knuth
where $T_1$ is the time it takes to run the serialized code and $T_p$ is the speed-up for using $p$ processors
where $N$ is the number of processors
Take-aways
from IPython import parallel
rc = parallel.Client(profile='hpc')
map
and apply
in parallelrc.block = True
dview = rc.direct_view()
dview.block = False
dview["a"] = 5 # shorthand for push
dview["b"] = 7
dview.apply_sync(lambda x: a + b + x, 27)
apply_async
d = {}
d["a"] = 5
d
push
ing python objects to the engineskey
or by using get
and update
like built-in dictspush
push
takes a dictionarydview.push(dict(msg="Hi, there"), block=True)
dview.block = True
dview.execute("x = msg")
dview["x"] # shorthand for pull
#rc[::2].execute("c = a + b")
# or
dview.execute("c = a + b", targets=[0,2])
#rc[1::2].execute("c = a - b")
# or
dview.execute("c = a - b", targets=[1,3])
dview.pull("c")
AsyncResult
object back immediatelydef wait(t):
import time
tic = time.time()
time.sleep(t)
return time.time() - tic
ar = dview.apply_async(wait, 2)
type(ar)
ar.get()
ready
methodar = dview.apply_async(wait, 15)
print ar.ready()
ar.get(5)
wait
methodwait
can take an iterable of AsyncResults
result_list = [dview.apply_async(wait, 3) for i in range(5)]
result_list
dview.wait(result_list)
result_list[4].get()
scatter
to partition an iterable across enginesgather
pulls the results backdview.scatter('x', range(64))
%px y = [i**10 for i in x]
y = dview.gather('y')
print y[:10]
%
indicates that we are using an IPython 'magic' functionrc = parallel.Client(profile='hpc')
lview = rc.load_balanced_view()
lview.block = True
parallel_result = lview.map(lambda x:x**10, range(32))
print parallel_result[:10]
y = np.array([4.284, 4.149, 3.877, .533, 2.211, 2.389,
2.145, 3.231, 1.998, 1.379, 2.106, 1.428,
1.011, 2.179, 2.858, 1.388, 1.651, 1.593,
1.046, 2.152])
x = np.array([.286, .645, .973, .585, .384, .310,
.276, .058, .973, .455, .543, .779,
.957, .259, .948, .202, .543, .028,
.797, .099, .936, .142, .889, .296,
.006, .175, .828, .180, .399, .842,
.617, .039, .939, .103, .784, .620,
.072, .158, .889, .704]).reshape(20,2)
x = np.column_stack((np.ones(len(x)), x))
print y
print x
def func(params, y, x):
import numpy as np
theta = np.r_[params[0], params[1], params[1]**2]
return y - np.dot(x,theta)
theta1, theta2 = np.mgrid[-3:3:100j,-3:3:100j]
Z = [np.sum(func([i,j], y, x)**2) for i,j in
zip(theta1.flatten(), theta2.flatten())]
Z = np.asarray(Z).reshape(100,100)
fig, ax = plt.subplots(figsize=(6, 6))
V = [16.1, 18, 20, 20.5, 21, 22, 24,
25, 30, 40, 50, 100, 200, 300,
400, 500, 600, 700]
c = ax.contour(theta1, theta2, Z, V)
im = ax.imshow(Z, interpolation='bilinear', origin='lower',
cmap=plt.cm.BrBG, extent=(-3,3,-3,3))
cb = plt.colorbar(c)
ax.set_xlabel(r'$\theta_1$')
ax.set_ylabel(r'$\theta_2$')
#ax.scatter([.864737, 2.35447, 2.49860664], [1.235748, -.319186, -0.98261242],
ax.scatter([.864737, 2.49860664], [1.235748, -0.98261242],
marker="x", s=30, color='black', lw=2)
ax.set_title('Loci of objective function')
ax.set_xlim([-3,3])
ax.set_ylim([-3,3])
ax.grid(False)
plt.show()
x1 = [0,0] # good
x2 = [2.354471, -.319186] # bad
x3 = [1, 1] # good
x4 = [-3.17604581, -0.680944] # bad
# assume we got these in some sane way
xs = np.random.normal(0, 4, size=(20, 2))
starts = np.row_stack((x1, x2, x3, x4, xs))
def optimize_func(start_params):
return leastsq(func, start_params, args=(y, x))[0]
dview = rc[:]
with dview.sync_imports():
from scipy.optimize import leastsq
import numpy as np
dview.push(dict(func=func, y=y, x=x));
results = dview.map_sync(optimize_func, starts)
opt_func = lambda params : np.sum(func(params, y, x)**2)
i_best = np.argmin(map(opt_func, np.array([result for result in results])))
print results[i_best]
Read more about this here
install StarCluster
$ pip install starcluster --user
setup your base config file
$ starcluster help
Select option 2.
Setup your config file with your SSH keys, etc.
Add your AWS credentials
Add the few IPython-specific lines to your config file
Run
$ starcluster start mycluster
You can login to your master node via ssh by running
$ starcluster sshmaster mycluster -u myuser
Replacing with your information as needed.
Or better yet, follow the instructions above to create a local IPython interpreter or notebook connected to your remote EC2 instance
Run parallel scripts or create views and use IPython just as if you would locally.
Log in to Zorro
Create profile from the terminal
ipython profile create --parallel --profile=your-profile-name
You can have as many as you want. For example, you may have a different profile depending on the queue you want to use or one with different default imports on the engines. I named mine hpc
.
Go to
$HOME/.ipython/profile_your-profile-name
You can make sure of your configuration directory by running
ipython locate
In this directory, edit the following lines in ipcluster_config.py
to read
c.IPClusterStart.controller_launcher_class = 'LSF'
c.IPClusterEngines.engine_launcher_class = 'LSF'
Set up the controller
Edit the following lines in ipcontroller_config.py
to read
c.HubFactory.ip = '*'
This is so that the controller listens on all interfaces for the engines.
Set up the engines
Edit the following lines in ipengine_config.py
c.IPEngineApp.work_dir = u'$HOME/scratch/'
c.EngineFactory.timeout=10
The last step is to edit your ~/.bashrc
and add the following line
export PATH=$PATH:/app/epd/bin
Then type
source ~/.bashrc
This is so that you can run the ipcluster or ipcontroller scripts on the head node. Alternatively, you could create symlink in your $HOME/bin folder.
ln -s /app/epd/bin/ipcluster ~/bin/
ln -s /app/epd/bin/ipcontroller ~/bin/
Create your batch scripts
Make two files in your working directory that will be your batch files for the engines and the controller. I named mine lsf.engine.template
and lsf.controller.template
. After the initial set-up these (or similar) files will control our job submission.
Tell the ipcluster_config.py
file about your batch scripts by adding the following lines.
c.LSFEngineSetLauncher.batch_template_file = "lsf.engine.template"
c.LSFControllerLauncher.batch_template_file = "lsf.controller.template"
lsf.engine.template
#!/bin/bash
#BSUB-L /bin/bash
#BSUB-J ipython
#BSUB-q interactive
#BSUB-n {n}
#BSUB-u your-email@american.edu
#BSUB-N
#BSUB-c 5
# the ipython code
# enter your working directory
cd $HOME/scratch
export PATH=$HOME/bin:/app/epd/bin/
export PYTHONPATH=/app/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages
ipengine --profile=hpc
lsf.controller.template
#!/bin/bash
#BSUB-L /bin/bash
#BSUB-J ipython
#BSUB-q interactive
#BSUB-n 1
#BSUB-u your-email@american.edu
#BSUB-N
#BSUB-c 5 # timeout in minutes
cd $HOME/scratch
export PATH=$HOME/bin:/app/epd/bin/
export PYTHONPATH=/app/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages
ipcontroller --profile=hpc
You can run the cluster with
ipcluster start --profile=hpc --n=2
I run it with
ipcluster start --profile=hpc --n=2 &
The &
puts the job in the background.
When your job is done you can run
ipcluster stop --profile=hpc
You can also stop your engines (and the hub) from within your Python scripts by using
rc.shutdown(hub=True)
There are a few other ways to do this. Consult the IPython documentation and examples/
scp js2796a@zorro.american.edu:/home/js2796a/.ipython/profile_hpc/security/ipcontroller-client.json ~/school/talks/zorro
<li>Run the following code, it will prompt for your password</li>
<li>If you aren't doing a public demo, you can just provide the password by argument</li>
<li>You might still have to type this password at the terminal as well</li>
NOTE: You will need to be running the same version of IPython locally as you are running on the server. This is currently 1.1 on the AU HPC and 0.13.1 on the StarCluster images.
from IPython import parallel
rc = parallel.Client("./ipcontroller-client.json", sshserver="your-login@hpcserver", timeout=60)
rc = parallel.Client("./ipcontroller-client.json", sshserver="js2796a@zorro.american.edu", timeout=60)
Alternatively, you can connect to a StarCluster just as easily.
$ starcluster start mycluster
$ starcluster sshmaster mycluster -u myuser
skipper@master:~$ ipython
In [1]: from IPython.parallel import Client
In [2]: rc = Client()
In [3]: view = rc[:]
In [4]: view.block = True
In [5]: rc.ids
Out[5]: [0, 1]
In [6]: view.execute("import socket; x = socket.gethostname()")
Out[6]: <AsyncResult: finished>
In [7]: view["x"]
Out[7]: ['master', 'node001']
view = rc[:]
view.block = True
rc.ids
view.execute("import socket; x = socket.gethostname()")
view["x"]
Two files pidigits.py and parallelpi.py are included in this repository
They are from the IPython examples/parallel folder
There are several interesting examples here that you might want to go through
Copy them to your remote working directory
scp js2796a@american.edu:/home/js2796a/scratch/ ~/school/talks/zorro/parallelpi.py scp js2796a@american.edu:/home/js2796a/scratch/ ~/school/talks/zorro/pidigits.py
from pidigits import plot_one_digit_freqs, txt_file_to_digits, one_digit_freqs
#view.execute("from pidigits import *")
import sympy
pi = sympy.pi.evalf(40)
print pi
Create 10,000 digits of $\pi$ using SymPy
pi = sympy.pi.evalf(10000)
# make a sequence of strings
digits = (d for d in str(pi)[2:])
freqs = one_digit_freqs(digits)
ax = plot_one_digit_freqs(freqs)
def compute_two_digit_freqs(filename):
"""
Read digits of pi from a file and compute the 2 digit frequencies.
"""
d = txt_file_to_digits(filename)
freqs = two_digit_freqs(d)
return freqs
def reduce_freqs(freqlist):
"""
Add up a list of freq counts to get the total counts.
"""
allfreqs = np.zeros_like(freqlist[0])
for f in freqlist:
allfreqs += f
return allfreqs
Get the number of engines available
n = len(rc)
print n
Create the list of files to process.
filestring = 'pi200m.ascii.%(i)02dof20'
files = [filestring % {'i':i} for i in range(1,n+1)]
files
Download the data files on the engines if they don't already exist:
view.map(fetch_pi_file, files)
Run 10 million digits on 1 engine
from timeit import default_timer as clock
t1 = clock()
id0 = rc.ids[0]
freqs10m = rc[id0].apply_sync(compute_two_digit_freqs, files[0])
t2 = clock()
digits_per_second1 = 10.0e6/(t2-t1)
print "Digits per second (1 core, 10m digits): ", digits_per_second1
Now do the same on each engine in parallel
t1 = clock()
# Compute the digits
freqs_all = view.map(compute_two_digit_freqs, files[:n])
# Add up the frequencies from each engine.
freqsn10m = reduce_freqs(freqs_all)
t2 = clock()
digits_per_secondn = n*10.0e6/(t2-t1)
print "Digits per second (%i engines, %i0m digits): "%(n,n), digits_per_secondn
print "Speedup: ", digits_per_secondn/digits_per_second1
plot_two_digit_freqs(freqsn10m, figsize=(10,10))
plt.title("2 digit sequences in %i0m digits of pi" % n);