This example shows how to benefit from existing parallelism (and optimizations like warm restart)
from mempamal.datasets import iris
import numpy as np
# iris dataset as usual but with linear regression (Why not! :p).
X, y = iris.get_data()
# adding i.i.d noise to X -> 4 "real" features + 10,000 noise features
# provide a sufficient workload to see the interest of nested parallelism
X = np.hstack((X, np.random.randn(X.shape[0], 10000)))
from sklearn.linear_model import ElasticNetCV
from sklearn.cross_validation import StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.metrics import r2_score
We use the ElasticNetCV
wich provide warm restart and nested parallism (provided by joblib
). So, we don't need a inner cross-validation loop since the estimator already provide it. in this example, the grid of parameters contains 500 tuples.
l1_ratios = np.linspace(0.1,1,10).tolist()
s = ElasticNetCV(l1_ratio=l1_ratios, n_alphas=50, n_jobs=-1)
est = Pipeline([("enet", s)])
from mempamal.configuration import JSONify_estimator, JSONify_cv, build_dataset
from mempamal.workflow import create_wf, save_wf
We jsonify the estimator and the cross-validation configuration.
We build the dataset in the current directory.
It's create a dataset.joblib
file.
Then we create the workflow in our internal format (create_wf
).
With verbose=True
, it prints the commands on stdout
.
And finally, we output the workflow (save_wf
) in the soma-workflow format
and write it to workflow.json
(need soma-workflow).
method_conf = JSONify_estimator(est, out="./est.json")
cv_conf = JSONify_cv(StratifiedKFold, cv_kwargs={"n_folds": 5},
score_func=r2_score,
stratified=True,
out="./cv.json")
dataset = build_dataset(X, y, method_conf, cv_conf, outputdir=".")
wfi = create_wf(dataset['folds'], cv_conf, method_conf, ".",
verbose=True)
wf = save_wf(wfi, "./workflow.json", mode="soma-workflow")
python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_0.pkl 0 python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_1.pkl 1 python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_2.pkl 2 python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_3.pkl 3 python mempamal/scripts/mapper.py ./cv.json ./est.json ./dataset.joblib ./red_res_4.pkl 4 python mempamal/scripts/outer_reducer.py ./final_res.pkl ./red_res_{outer}.pkl
Now, we create a WorkflowController
and we set the number of processors to 1 in order to let the nested parallelism using the ressources.
We submit the workflow and we wait for workflow completion, then we read the final results.
from soma_workflow.client import WorkflowController
import time
import json
import sklearn.externals.joblib as joblib
controller = WorkflowController()
# limit the scheduler to 1 task (for the nested parallelism)
old_nproc = controller.scheduler_config.get_proc_nb()
controller.scheduler_config.set_proc_nb(1)
wf_id = controller.submit_workflow(workflow=wf, name="fourth example")
while controller.workflow_status(wf_id) != 'workflow_done':
time.sleep(2)
# reset the scheduler policy
controller.scheduler_config.set_proc_nb(old_nproc)
print(joblib.load('./final_res.pkl'))
light mode {'std': 0.01165793880264987, 'raw': array([ 0.88502578, 0.88896717, 0.89306213, 0.88935599, 0.86066079]), 'median': 0.88896717072541787, 'mean': 0.88341437212120044}