IPython.parallel and generators

Sometimes you have a generator for tasks, but don't want to load all tasks from the generator all at once. Reasons for this could include tasks involving lots of memory, or taking a long time to load.

In some of these cases, you can stop submitting tasks once you see a certain result. For demonstration purposes, we will use a dumb implementation of determining whether a number is prime.

In [1]:
import IPython
print IPython.sys_info()
{'codename': 'An Afternoon Hack',
 'commit_hash': 'a886d6e',
 'commit_source': 'repository',
 'default_encoding': 'UTF-8',
 'ipython_path': '/Users/minrk/dev/ip/mine/IPython',
 'ipython_version': '2.0.0-dev',
 'os_name': 'posix',
 'platform': 'Darwin-12.4.0-x86_64-i386-64bit',
 'sys_executable': '/usr/bin/python',
 'sys_platform': 'darwin',
 'sys_version': '2.7.2 (default, Oct 11 2012, 20:14:37) \n[GCC 4.2.1 Compatible Apple Clang 4.0 (tags/Apple/clang-418.0.60)]'}

Our example 'data generator' is the possible factors for a given number. This generator yields the number 2, and every odd integer ≤ $\sqrt{N}$.

In [2]:
from math import sqrt

def generate_possible_factors(N):
    """generator for iterating through possible factors for N
    
    yields 2, every odd integer <= sqrt(N)
    """
    if N <= 3:
        return
    yield 2
    f = 3
    last = int(sqrt(N))
    while f <= last:
        yield f
        f += 2

for f in generate_possible_factors(300):
    print f
2
3
5
7
9
11
13
15
17

Now our trivial function that we will use as a task with IPython.parallel

In [3]:
def is_factor(f, N):
    """is f a factor of N?"""
    return (N % f) == 0

And a complete implementation of prime check using the generator and our factor function:

In [4]:
def dumb_prime(N):
    """dumb implementation of is N prime?"""
    for f in generate_possible_factors(N):
        if is_factor(f, N):
            return False
    return True

for N in range(900,1000):
    if dumb_prime(N):
        print N
907
911
919
929
937
941
947
953
967
971
977
983
991
997

Now in Parallel

In [6]:
from IPython import parallel

rc = parallel.Client()
dv = rc[:]
view = rc.load_balanced_view()
dv
Out[6]:
<DirectView [0, 1, 2, 3]>

Now we have a function that does prime checking in parallel, with a limited number of tasks loaded from the generator at any given point in time.

The logic of the function:

  1. submit up to max_outstanding tasks
  2. if no more tasks to submit, stop submitting, skip to 7
  3. wait a little bit, and check if any tasks are done.
  4. if any are done, check if one found a factor for N
  5. if we found a factor, we are done. Stop submitting and return False.
  6. if we haven't found a factor yet, go back to 1. and submit more tasks to replace the once that have finished.
  7. if we have submitted every task, wait for the last few to finish, and see if any found a factor
  8. if no task found a factor, N is prime: return True
In [7]:
def parallel_dumb_prime(N, v, max_outstanding=10, dt=0.1):
    """dumb_prime where each factor is checked remotely
    
    Up to `max_outstanding` factors will be checked in parallel.
    
    Submission will halt as soon as we know that N is not prime.
    """
    tasks = set()
    # factors is a generator
    factors = generate_possible_factors(N)
    while True:
        try:
            # submit a batch of tasks, with a maximum of `max_outstanding`
            for i in range(max_outstanding-len(tasks)):
                f = factors.next()
                tasks.add(v.apply_async(is_factor, f, N))
        except StopIteration:
            # no more factors to test, stop submitting
            break
        # get the tasks that are done
        ready = set(task for task in tasks if task.ready())
        while not ready:
            # wait a little bit for some tasks to finish
            v.wait(tasks, timeout=dt)
            ready = set(task for task in tasks if task.ready())
        
        for t in ready:
            # get the result - if True, N is not prime, we are done
            if t.get():
                return False
        # update tasks to only those that are still pending,
        # and submit the next batch
        tasks.difference_update(ready)
    # check the last few outstanding tasks
    for task in tasks:
        if t.get():
            return False
    # checked all candidates, none are factors, so N is prime
    return True
In [8]:
for N in range(900,1000):
    if parallel_dumb_prime(N, view, 10):
        print N
907
911
919
929
937
941
943
947
953
961
967
971
977
983
989
991
997

Back to top