Sometimes you have a generator for tasks, but don't want to load all tasks from the generator all at once. Reasons for this could include tasks involving lots of memory, or taking a long time to load.
In some of these cases, you can stop submitting tasks once you see a certain result. For demonstration purposes, we will use a dumb implementation of determining whether a number is prime.
import IPython
print IPython.sys_info()
{'codename': 'An Afternoon Hack', 'commit_hash': 'a886d6e', 'commit_source': 'repository', 'default_encoding': 'UTF-8', 'ipython_path': '/Users/minrk/dev/ip/mine/IPython', 'ipython_version': '2.0.0-dev', 'os_name': 'posix', 'platform': 'Darwin-12.4.0-x86_64-i386-64bit', 'sys_executable': '/usr/bin/python', 'sys_platform': 'darwin', 'sys_version': '2.7.2 (default, Oct 11 2012, 20:14:37) \n[GCC 4.2.1 Compatible Apple Clang 4.0 (tags/Apple/clang-418.0.60)]'}
Our example 'data generator' is the possible factors for a given number. This generator yields the number 2, and every odd integer ≤ $\sqrt{N}$.
from math import sqrt
def generate_possible_factors(N):
"""generator for iterating through possible factors for N
yields 2, every odd integer <= sqrt(N)
"""
if N <= 3:
return
yield 2
f = 3
last = int(sqrt(N))
while f <= last:
yield f
f += 2
for f in generate_possible_factors(300):
print f
2 3 5 7 9 11 13 15 17
Now our trivial function that we will use as a task with IPython.parallel
def is_factor(f, N):
"""is f a factor of N?"""
return (N % f) == 0
And a complete implementation of prime check using the generator and our factor function:
def dumb_prime(N):
"""dumb implementation of is N prime?"""
for f in generate_possible_factors(N):
if is_factor(f, N):
return False
return True
for N in range(900,1000):
if dumb_prime(N):
print N
907 911 919 929 937 941 947 953 967 971 977 983 991 997
from IPython import parallel
rc = parallel.Client()
dv = rc[:]
view = rc.load_balanced_view()
dv
<DirectView [0, 1, 2, 3]>
Now we have a function that does prime checking in parallel, with a limited number of tasks loaded from the generator at any given point in time.
The logic of the function:
max_outstanding
tasksdef parallel_dumb_prime(N, v, max_outstanding=10, dt=0.1):
"""dumb_prime where each factor is checked remotely
Up to `max_outstanding` factors will be checked in parallel.
Submission will halt as soon as we know that N is not prime.
"""
tasks = set()
# factors is a generator
factors = generate_possible_factors(N)
while True:
try:
# submit a batch of tasks, with a maximum of `max_outstanding`
for i in range(max_outstanding-len(tasks)):
f = factors.next()
tasks.add(v.apply_async(is_factor, f, N))
except StopIteration:
# no more factors to test, stop submitting
break
# get the tasks that are done
ready = set(task for task in tasks if task.ready())
while not ready:
# wait a little bit for some tasks to finish
v.wait(tasks, timeout=dt)
ready = set(task for task in tasks if task.ready())
for t in ready:
# get the result - if True, N is not prime, we are done
if t.get():
return False
# update tasks to only those that are still pending,
# and submit the next batch
tasks.difference_update(ready)
# check the last few outstanding tasks
for task in tasks:
if t.get():
return False
# checked all candidates, none are factors, so N is prime
return True
for N in range(900,1000):
if parallel_dumb_prime(N, view, 10):
print N
907 911 919 929 937 941 943 947 953 961 967 971 977 983 989 991 997