Python is great because is has so many packages. However, you will quickly discover that installing packages without administrative privileges can become challenging. There is virtualenv that lets you install packages locally but it can be a nightmare when you have C librairies to build. If you want to do scientific computing with Python, I advise you to go with the Anaconda distribution. It will save you a great deal of time at McGill and will make the SOCS sysadmins life easier.
For this demonstration, we will work on Jordan Frank's gait dataset. A complete description of the dataset can be found at http://www.cs.mcgill.ca/~jfrank8/data/gait-dataset.html
%%bash
curl -O http://www.cs.mcgill.ca/~jfrank8/data/gait-dataset.tar.gz
tar xzf gait-dataset.tar.gz
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 45.0M 100 45.0M 0 0 2008k 0 0:00:22 0:00:22 --:--:-- 2746k
We could also have chosen to load the data through numpy.loadtxt. However, we will use fit an Autoregressive Moving Average ARAMA model using statsmodels which tends to prefer Pandas dataframes.
import statsmodels.api as sm
import pandas as pd
import os
dta_path = 'gait-dataset'
files_byday = lambda dta_path, day: [os.path.join(dta_path, name)
for name in os.listdir(dta_path)
if name.endswith('.csv') and 'day'+str(day) in name]
day1 = files_byday(dta_path, 1)
dta = pd.read_csv(day1[0], header=0, sep="\t", usecols=['timestamp', 'accel_mag'])
dta.index = dta['timestamp'].astype('M8[ms]')
del dta['timestamp']
dta.plot(figsize=(12,8));
We will attempt to differentiate people using only the magniture of the accelerometer values. The idea will be to produce features using a local ARMA model which will then be leveraged for classification using an SVM. We will fist tackle the problem of iterating through sliding windows.
def arma_featurizer(filename, nwin=500, nparam=2):
import numpy as np
import pandas as pd
import statsmodels.api as sm
dta = pd.read_csv(filename, header=0, sep="\t", usecols=['timestamp', 'accel_mag'])
dta.index = pd.to_datetime(dta['timestamp'], unit='ms')
del dta['timestamp']
nwin, nparam = 10, nparam
X = np.zeros((nwin, nparam+1))
for i, subseries in enumerate(np.array_split(dta, nwin)):
model = sm.tsa.ARMA(subseries, (nparam,0)).fit()
X[i] = model.params
return X
We can start a local cluster with two engines using the following shell command:
!ipcluster start -n=2 --daemon
2014-04-15 11:15:23.944 [IPClusterStart] Using existing profile dir: u'/Users/pierrelucbacon/.ipython/profile_default'
from IPython import parallel
rc = parallel.Client()
rc.block = True
all_engines = rc[:]
Similar to the idea to the corresponding idea in database, we access the cluster through either a DirectView or LoadBalancedView.
We can now map an array of filenames through the ARMA featurizer. A different parellization model could have been chosen by mapping windows to engines. However, I would expect it creates more overhead than it saves computing time. In the code below, we will instead map the full time series associated with one person to one single engine. Within one engine, the local ARMA models will be computed sequentially.
Y = all_engines.map(arma_featurizer, day1[:2])
In order to assess the performance of our classifier, we will use stratified kfold cross-validation. Using the ARMA features that we computed above, we will form the matrix of instances $X$ and corresponding label vector $y$. In order to minimize the data exchange over the network, we will upload the dataset to the cluster and open a mmap view in each engine.
def fit_svm(fold):
import numpy as np
from sklearn import svm
from sklearn.metrics import accuracy_score
X = np.load('instances.npy', mmap_mode='r')
y = np.load('labels.npy', mmap_mode='r')
train, test = fold
clf = svm.SVC()
clf.fit(X[train], y[train])
ypred = clf.predict(X[test])
return accuracy_score(y[test], ypred)
from sklearn import cross_validation
np.save('instances', np.vstack(Y))
labels = np.repeat(range(len(Y)), len(Y[0]))
np.save('labels', labels)
accuracy_scores = all_engines.map(fit_svm, cross_validation.StratifiedKFold(labels, n_folds=2))
print accuracy_scores
[0.90000000000000002, 0.69999999999999996]
!ipcluster stop
2014-04-15 11:17:37.881 [IPClusterStop] Using existing profile dir: u'/Users/pierrelucbacon/.ipython/profile_default' 2014-04-15 11:17:37.917 [IPClusterStop] Stopping cluster [pid=4463] with [signal=2]
We will use the Amazon Elastic Compute Cloud (Amazon EC2) infrastructure to deploy our cluster. This will require you to create an account at https://aws.amazon.com/ec2/ and enter payment details. Fortunately, you won't have to pay much more than a few dollars for each experiment. To give you an idea of the prices, you can take a look at https://aws.amazon.com/ec2/pricing/. The pricing model is per hour of compute time and depends on the hardware that you choose.
Depending on the robustness of your code, you can also go with the spot instances model where you bid for hardware resources. The drawback is that you might wait indefinitely long of your precious experiment might get shutdown because another client outbid your price. You help place an educated guess, you can use the spothistory
subcommand:
You can install Starcluster autmatically from conda or PIP using:
A default configuration file can be created by calling starcluster
and choosing "2" at the menu:
In order to use IPython with Starcluster, you will need to add the following section to your .starcluster/.config/
. As a reference, I have put my config file here https://gist.github.com/pierrelux/10695188
and add ipcluster
to the list of plugins to load
You can also set the default type of instances through the NODE_INSTANCE_TYPE
directive. For example:
You will need to set the name of the SSH key that you wish to install on the cluster. Here "rllab" is the name of an existing SSH key created under .ssh/rllab.rsa
You can create an RSA key as follow:
You can then start a 2 nodes cluster on spotinstances with the command:
After a little while, you should have a cluster up and running to which you can SSH using:
You can choose to run IPython notebook completly remotly on the cluster or use connect your local instance to the remote ipcluster engines. In this case, the only modification to our code consists in:
rc = parallel.Client('/Users/pierrelucbacon/.starcluster/ipcluster/SecurityGroup:@sc-rllab-us-east-1.json', sshkey='/Users/pierrelucbacon/.ssh/rllab.rsa')
rc.block = True
all_engines = rc[:]
%%px
!ifconfig eth0 | grep 'inet addr'
[stdout:10] inet addr:172.31.40.36 Bcast:172.31.47.255 Mask:255.255.240.0 [stdout:11] inet addr:172.31.37.163 Bcast:172.31.47.255 Mask:255.255.240.0 [stdout:12] inet addr:172.31.41.111 Bcast:172.31.47.255 Mask:255.255.240.0 [stdout:13] inet addr:172.31.41.229 Bcast:172.31.47.255 Mask:255.255.240.0 [stdout:14] inet addr:172.31.41.85 Bcast:172.31.47.255 Mask:255.255.240.0 [stdout:15] inet addr:172.31.44.187 Bcast:172.31.47.255 Mask:255.255.240.0 [stdout:16] inet addr:172.31.39.207 Bcast:172.31.47.255 Mask:255.255.240.0 [stdout:17] inet addr:172.31.45.236 Bcast:172.31.47.255 Mask:255.255.240.0 [stdout:18] inet addr:172.31.41.187 Bcast:172.31.47.255 Mask:255.255.240.0 [stdout:19] inet addr:172.31.38.164 Bcast:172.31.47.255 Mask:255.255.240.0
When working with a remote cluster, you will probably need to first copy your dataset. The DirectView
interview provides a dict-like mechanism to scatter the data on each engine. I prefer to avoid this solution since not only does it saturates my bandwidth, but it also creates memory duplicates. Since starcluster will setup an NFS share, it suffice to download the dataset from a single engine. Instead of uploading the dataset through SSH, we will fetch it from the cluster by running the following bash commands remotely:
%%px --targets=10
%%bash
curl -O http://www.cs.mcgill.ca/~jfrank8/data/gait-dataset.tar.gz
tar xzf gait-dataset.tar.gz
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 45.0M 100 45.0M 0 0 977k 0 0:00:47 0:00:47 --:--:-- 1092k
Let's now repeat the above experiment but run it completely on the cluster. As before, we first featurize the dataset and then cross-validate our results.
Y = all_engines.map(arma_featurizer, day1)
np.save('instances', np.vstack(Y))
labels = np.repeat(range(len(Y)), len(Y[0]))
np.save('labels', labels)
%%bash
starcluster put rllab -u sgeadmin instances.npy /home/sgeadmin
starcluster put rllab -u sgeadmin labels.npy /home/sgeadmin
StarCluster - (http://star.mit.edu/cluster) (v. 0.95.4) Software Tools for Academics and Researchers (STAR) Please submit bug reports to starcluster@mit.edu instances.npy 100% |||||||||||||||||||||||||||||||||| Time: 00:00:00 1.95 M/s StarCluster - (http://star.mit.edu/cluster) (v. 0.95.4) Software Tools for Academics and Researchers (STAR) Please submit bug reports to starcluster@mit.edu labels.npy 100% ||||||||||||||||||||||||||||||||||||| Time: 00:00:00 793.25 K/s
accuracy_scores = all_engines.map(fit_svm, cross_validation.StratifiedKFold(labels, n_folds=4))
print accuracy_scores
[0.23999999999999999, 0.29999999999999999, 0.23999999999999999, 0.22]