try:
import ipyparallel as ipp
except ImportError:
import IPython.parallel as ipp
rc = ipp.Client()
v = rc.load_balanced_view()
fail_ar = v.apply_async(lambda : 1/0)
success_ar = v.apply_async(lambda : 'ok')
rc.wait()
print("Fail ran on %i" % fail_ar.engine_id)
print("Success ran on %i" % success_ar.engine_id)
Fail ran on 0 Success ran on 2
The simple version is to pass after
, follow
to view.temp_flags
context manager:
with v.temp_flags(after=success_ar):
print(v.apply_sync(lambda : 'ok'))
ok
Dependency
objects let you encapsulate more fine-grained logic about dependencies,
such as whether success or failure should be considered,
or whether any or all dependencies need to be met.
The default behavior is to consider the dependency met only if all dependees succeed.
Let's make a few dependencies for various combinations of failure and success, and see how they behave:
all_success = ipp.Dependency([fail_ar, success_ar])
any_success = ipp.Dependency([fail_ar, success_ar], all=False)
anything = ipp.Dependency([fail_ar, success_ar], all=False, failure=True)
all_success_or_fail = ipp.Dependency([fail_ar, success_ar], failure=True)
any_fail = ipp.Dependency([fail_ar, success_ar], all=False, failure=True, success=False)
all_fail = ipp.Dependency([fail_ar, success_ar], failure=True, success=False)
Can't follow multiple tasks, because they didn't happen in the same place:
with v.temp_flags(follow=all_success_or_fail):
v.apply_sync(lambda : "impossible")
Traceback (most recent call last): File "/Users/minrk/dev/ip/parallel/ipyparallel/controller/scheduler.py", line 490, in fail_unreachable raise why() ipyparallel.error.ImpossibleDependency
Similarly, can't run after they all succeed or all fail, because one succeeds
with v.temp_flags(after=all_fail):
v.apply_sync(lambda : "impossible")
Traceback (most recent call last): File "/Users/minrk/dev/ip/parallel/ipyparallel/controller/scheduler.py", line 490, in fail_unreachable raise why() ipyparallel.error.ImpossibleDependency
def test_run(**flags):
"""Test run a task a few times, showing where it ran."""
for i in range(3):
with v.temp_flags(**flags):
ar = v.apply_async(lambda : 'ok')
ar.get()
print("Ran on %i" % ar.engine_id)
follow=any success: will run on an engine that succeeded
test_run(follow=any_success)
Ran on 2 Ran on 2 Ran on 2
after=any success: same condition as above, but doesn't affect where the task runs, only when.
test_run(after=any_success)
Ran on 1 Ran on 0 Ran on 3
follow=any fail: will only follow the failure, not the success.
test_run(follow=any_fail)
Ran on 0 Ran on 0 Ran on 0
follow=any success or failure confines the task to where a task ran, but doesn't care about success.
test_run(follow=anything)
Ran on 2 Ran on 0 Ran on 2