%%bash # Miniconda allows to download binary instead of compiling like pip # get Miniconda from http://conda.pydata.org/miniconda.html wget http://repo.continuum.io/miniconda/Miniconda-3.0.5-Linux-x86_64.sh bash Miniconda-3.0.5-Linux-x86_64.sh conda create -n pysc --yes ipython pyzmq tornado jinja2 pandas pygments pip pycrypto source activate pysc pip install --upgrade starcluster from ConfigParser import ConfigParser config = ConfigParser() import pandas as pd # extract first row of csv credentials = pd.read_csv("credentials.csv").ix[0] config.add_section("aws info") config.set("aws info", "aws_access_key_id", credentials["Access Key Id"]) config.set("aws info", "aws_secret_access_key", credentials["Secret Access Key"]) # key pairs are region-specific config.set("aws info", "aws_region_name", "us-west-2") config.set("aws info", "aws_region_host", "ec2.us-west-2.amazonaws.com") config.add_section("keypair starcluster") config.set("keypair starcluster", "key_location", "starcluster.pem") import os import os.path def write_sc_conf(sc_conf): """Write starcluster configuration to ~/.starcluster/config""" folder = os.path.join(os.path.expanduser("~"), ".starcluster") try: os.makedirs(folder) except: pass with open(os.path.join(folder, "config"), "w") as f: config.write(f) write_sc_conf(config) %%bash starcluster listpublic | grep 64 sec = "cluster pyec2" config.add_section(sec) config.set(sec, "keyname", "starcluster") config.set(sec, "cluster_size", 1) config.set(sec, "cluster_user", "ipuser") config.set(sec, "disable_queue", True) ami = "ami-706afe40" instance = "t1.micro" for name in ["master", "node"]: config.set(sec, name + "_image_id", ami) config.set(sec, name + "_instance_type", instance) config.add_section("global") config.set("global", "default_template", "pyec2") write_sc_conf(config) %%bash starcluster createvolume -n ebs1gbwest2a -i ami-fa9cf1ca --detach-volume 1 us-west-2a config.add_section("volume data") # this is the Amazon EBS volume id config.set("volume data", "volume_id", "vol-53829851") # the path to mount this EBS volume on # (this path will also be nfs shared to all nodes in the cluster) config.set("volume data", "mount_path", "/data") config.set("cluster pyec2", "volumes", "data") write_sc_conf(config) sec = "plugin ipcluster" config.add_section(sec) config.set(sec, "setup_class", "starcluster.plugins.ipcluster.IPCluster") config.set(sec, "enable_notebook", True) # set a password for the notebook for increased security config.set(sec, "notebook_passwd", "mysupersecretpassword") # store notebooks on EBS! config.set(sec, "notebook_directory", "/data") # pickle is faster for communication than the default JSON config.set(sec, "packer", "pickle") config.add_section("plugin pypackages") config.set("plugin pypackages", "setup_class", "starcluster.plugins.pypkginstaller.PyPkgInstaller") config.set("plugin pypackages", "packages", "scikit-learn, psutil") config.set("cluster pyec2", "plugins", "pypackages, ipcluster") write_sc_conf(config) %%bash starcluster start -s 1 pyec2 %%bash starcluster sshmaster -A pyec2 # -A use local keys remotely %%bash starcluster terminate -c pyec2 # -c does not prompt for confirm %%bash starcluster spothistory c3.2xlarge %%bash starcluster start -s 1 --force-spot-master -b 0.5 -I c3.2xlarge singlenode %%bash starcluster start -c pyec2 -s 5 -b 0.5 -I c3.2xlarge fivenodescluster def compute_evaluation(filename, model, params): """Function executed by a worker to evaluate a model""" # All module imports should be executed in the worker namespace from sklearn.externals import joblib X_train, y_train, X_validation, y_validation = joblib.load( filename, mmap_mode='c') model.set_params(**params) model.fit(X_train, y_train) validation_score = model.score(X_validation, y_validation) return validation_score import numpy as np svc_params = { 'C': np.logspace(-1, 2, 4), 'gamma': np.logspace(-4, 0, 5), } from sklearn.grid_search import ParameterGrid list(ParameterGrid(svc_params)) len(list(ParameterGrid(svc_params))) def compute_evaluation(filename, model, params): """Function executed by a worker to evaluate a model""" # All module imports should be executed in the worker namespace from sklearn.externals import joblib X_train, y_train, X_validation, y_validation = joblib.load( filename, mmap_mode='c') model.set_params(**params) model.fit(X_train, y_train) validation_score = model.score(X_validation, y_validation) return validation_score from IPython.parallel import Client rc = Client() # create the balanced view object lview = rc.load_balanced_view() tasks = [] for each in svc_params: tasks.append(lview.apply_async(compute_evaluation, "data/input.pkl", model_1, each)) def progress(tasks): return np.mean([task.ready() for task in tasks]) print("Tasks completed: {0}%".format(100 * progress(tasks))) def find_best(tasks, n_top=5): """Compute the mean score of the completed tasks""" scores = [t.get() for t in tasks if t.ready()] return sorted(scores, reverse=True)[:n_top] print("Tasks completed: {0}%".format(100 * progress(tasks))) find_best(tasks) svc_params = { 'C': np.logspace(-1, 2, 4), 'gamma': np.logspace(-4, 0, 5), } from sklearn.externals import joblib from sklearn.cross_validation import ShuffleSplit import os def persist_cv_splits(X, y, n_cv_iter=5, name='data', suffix="_cv_%03d.pkl", test_size=0.25, random_state=None): """Materialize randomized train test splits of a dataset.""" cv = ShuffleSplit(X.shape[0], n_iter=n_cv_iter, test_size=test_size, random_state=random_state) cv_split_filenames = [] for i, (train, test) in enumerate(cv): cv_fold = (X[train], y[train], X[test], y[test]) cv_split_filename = name + suffix % i cv_split_filename = os.path.abspath(cv_split_filename) joblib.dump(cv_fold, cv_split_filename) cv_split_filenames.append(cv_split_filename) return cv_split_filenames from sklearn.datasets import load_digits digits = load_digits() digits_split_filenames = persist_cv_splits(digits.data, digits.target, name='digits', random_state=42) def compute_evaluation(cv_split_filename, model, params): """Function executed by a worker to evaluate a model on a CV split""" # All module imports should be executed in the worker namespace from sklearn.externals import joblib X_train, y_train, X_validation, y_validation = joblib.load( cv_split_filename, mmap_mode='c') model.set_params(**params) model.fit(X_train, y_train) validation_score = model.score(X_validation, y_validation) return validation_score def grid_search(lb_view, model, cv_split_filenames, param_grid): """Launch all grid search evaluation tasks.""" all_tasks = [] all_parameters = list(ParameterGrid(param_grid)) for i, params in enumerate(all_parameters): task_for_params = [] for j, cv_split_filename in enumerate(cv_split_filenames): t = lb_view.apply( compute_evaluation, cv_split_filename, model, params) task_for_params.append(t) all_tasks.append(task_for_params) return all_parameters, all_tasks from sklearn.svm import SVC from IPython.parallel import Client client = Client() lb_view = client.load_balanced_view() model = SVC() svc_params = { 'C': np.logspace(-1, 2, 4), 'gamma': np.logspace(-4, 0, 5), } all_parameters, all_tasks = grid_search( lb_view, model, digits_split_filenames, svc_params) def find_best(all_parameters, all_tasks, n_top=5): """Compute the mean score of the completed tasks""" mean_scores = [] for param, task_group in zip(all_parameters, all_tasks): scores = [t.get() for t in task_group if t.ready()] if len(scores) == 0: continue mean_scores.append((np.mean(scores), param)) return sorted(mean_scores, reverse=True)[:n_top] find_best(all_parameters, all_tasks)