In this notebook, we will use the module concurrent.futures to benefit from multithreading to parallelize the evaluation of a function.
We will also treat a common case where the executable code we want to wrap uses input/output files and how it affects parallelization.
If you're using Python >= 3.2 this module is available by default
else you might want to install it using one of these:
$ pip install futures --user
# sudo apt-get install python-concurrent.futures
$ conda install futures
from __future__ import print_function
import openturns as ot
import openturns.coupling_tools as otct
import concurrent.futures
import math as m
import tempfile
import shutil
This will allow one regular Python function to take advantage of multi-threading thanks to concurrent.futures.
def multithread(f):
def inner(X):
size = len(X)
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
future_to_y = {executor.submit(f, X[i]): i for i in range(size)}
Y = [[]]*size
for future in concurrent.futures.as_completed(future_to_y):
i = future_to_y[future]
x = X[i]
if future.exception() is not None:
print('%s generated an exception: %s' % (str(x), future.exception()))
Y[i] = future.result()
return Y
return inner
The decorated function will be replaced by its multithreaded counterpart.
@multithread
def my_func(X):
x0, x1, x2 = X
y = m.sin(x0)*m.cos(x1)*m.exp(x2)
return [y]
X = ot.Normal(3).getSample(10)
Y = my_func(X)
print(Y)
[[0.11056650323300793], [-0.7599636278619369], [-0.10410872208238846], [0.8245660033257063], [-0.05753438937971085], [-0.0428861679170567], [-0.6622663701610637], [0.09296920661207787], [0.6430689107595504], [0.44647079261717804]]
Sometimes you need to wrap an executable that needs input/output files.
A standard way of handling multithreading for this kind of wrapper is to isolate the executable in a temporary directory.
def isolate(files):
def wrap(f):
def inner(*args, **kwargs):
tmpdir = tempfile.mkdtemp()
for filex in files:
shutil.copy(filex, tmpdir)
kwargs['cwd'] = tmpdir
out = f(*args, **kwargs)
shutil.rmtree(tmpdir)
return out
return inner
return wrap
Let's say our executable reads input values from input.txt and outputs results in output.txt.
We can create a template file input.txt.in in which our wrapper will replace the values of X.
with open('input.txt.in', 'w') as f:
f.write('x0=@x0@;x1=@x1@;x2=@x2@')
with open('executable.py', 'w') as f:
f.write('exec(open("./input.txt").read())\n')
f.write('from math import *\n')
f.write('y = cos(x0) * sin(x1) * exp(x2)\n')
f.write('with open("output.txt", "w") as f:\n')
f.write(' f.write("y="+str(y))\n')
So we will have to copy the code and the template input file to the temporary directory.
@multithread
@isolate(['executable.py', 'input.txt.in'])
def my_func(X, cwd='.'):
tokens = ['@x0@', '@x1@', '@x2@']
otct.replace(cwd+'/'+'input.txt.in', cwd+'/'+'input.txt', tokens, X)
err = otct.execute('python executable.py', cwd=cwd)
y = otct.get_value(cwd+'/'+'output.txt', token='y=')
return [y]
Then you can use it in OpenTURNS:
model = ot.PythonFunction(3, 1, func_sample=my_func)
vect = ot.RandomVector(ot.Normal(3))
composite = ot.CompositeRandomVector(model, vect)
event = ot.ThresholdEvent(composite, ot.Less(), -3.0)
experiment = ot.MonteCarloExperiment()
algo = ot.ProbabilitySimulationAlgorithm(event, experiment)
algo.setMaximumOuterSampling(100)
algo.setBlockSize(8)
algo.run()
print(algo.getResult())
probabilityEstimate=1.375000e-02 varianceEstimate=1.904381e-05 standard deviation=4.36e-03 coefficient of variation=3.17e-01 confidenceLength(0.95)=1.71e-02 outerSampling=100 blockSize=8