The purpose of this script is to create a generic wrapper that approximates the behavior of multiprocessing.imap_unordered using the standard methods included with IPython.parallel.
One nice feature of imap_unordered is that it does not load all of the input data into memory at once. As a result, it's possible to grind over a large dataset by passing along a generator.
Unfortunately, multiprocessing doesn't play well with IPython Notebook and none of the methods in IPython.parallel take a generator without first loading all of the data into memory.
This problem was previously raised by Vincent on Stack Overflow last year and minrk offered a really nice example.
I think we can take it one step further and make an abstract wrapper that can be used anywhere you might have otherwise used multiprocessing.imap_unordered. This example is about half-way there.
import IPython
client = IPython.parallel.Client()
print client.ids
[0, 1, 2, 3]
lbview = client.load_balanced_view()
def iter_lines(datafile):
for line in datafile:
yield line.strip()
def tokenize(s):
return sorted(set(s.lower().split()))
!wget http://www.gutenberg.org/cache/epub/25439/pg25439.txt
!mv pg25439.txt bellamy-looking_backward-1887.txt
!wc -l bellamy-looking_backward-1887.txt
fn = "bellamy-looking_backward-1887.txt"
--2014-11-11 16:21:57-- http://www.gutenberg.org/cache/epub/25439/pg25439.txt Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47 Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:80... connected. HTTP request sent, awaiting response... 200 OK Length: 496005 (484K) [text/plain] Saving to: `pg25439.txt' 100%[======================================>] 496,005 --.-K/s in 0.1s 2014-11-11 16:21:58 (3.34 MB/s) - `pg25439.txt' saved [496005/496005] 8841 bellamy-looking_backward-1887.txt
%%time
with open(fn, "rb") as f:
tokens = map(tokenize, iter_lines(f))
CPU times: user 24 ms, sys: 24 ms, total: 48 ms Wall time: 53.2 ms
%%time
with open(fn, "rb") as f:
results = lbview.map(tokenize, iter_lines(f))
parallel_tokens = results.get()
CPU times: user 8.7 s, sys: 4.19 s, total: 12.9 s Wall time: 30.6 s
tokens == parallel_tokens
True
def imap(function, generator, view,
preprocessor=iter, chunksize=256):
num_cores = len(view.client.ids)
queue = []
for i, n in enumerate(preprocessor(generator)):
queue.append(n)
if not i % (chunksize * num_cores):
for result in view.map(function, queue):
yield result
queue = []
for result in view.map(function, queue):
yield result
%%time
with open(fn, "rb") as f:
imap_tokens = []
for result in imap(tokenize, f, lbview, iter_lines, 128):
imap_tokens.append(result)
CPU times: user 10.4 s, sys: 3.15 s, total: 13.6 s Wall time: 21.4 s
parallel_tokens == imap_tokens
True
from itertools import product
def stringcount((longstring, substrings)):
scount = [longstring.count(s) for s in substrings]
return (longstring, substrings, scount)
def gen_pairs(long_string, sub_strings):
for l in long_string:
s = sub_strings.next()
yield (l, s)
longstring = product('abc', repeat=3)
substrings = product('abc', repeat=2)
%%time
for x in map(stringcount, gen_pairs(longstring, substrings)):
print x
(('a', 'a', 'a'), ('a', 'a'), [3, 3]) (('a', 'a', 'b'), ('a', 'b'), [2, 1]) (('a', 'a', 'c'), ('a', 'c'), [2, 1]) (('a', 'b', 'a'), ('b', 'a'), [1, 2]) (('a', 'b', 'b'), ('b', 'b'), [2, 2]) (('a', 'b', 'c'), ('b', 'c'), [1, 1]) (('a', 'c', 'a'), ('c', 'a'), [1, 2]) (('a', 'c', 'b'), ('c', 'b'), [1, 1]) (('a', 'c', 'c'), ('c', 'c'), [2, 2]) CPU times: user 0 ns, sys: 0 ns, total: 0 ns Wall time: 549 µs
longstring = product('abc', repeat=3)
substrings = product('abc', repeat=2)
%%time
for x in imap(stringcount, gen_pairs(longstring, substrings), lbview):
print x
(('a', 'a', 'a'), ('a', 'a'), [3, 3]) (('a', 'a', 'b'), ('a', 'b'), [2, 1]) (('a', 'a', 'c'), ('a', 'c'), [2, 1]) (('a', 'b', 'a'), ('b', 'a'), [1, 2]) (('a', 'b', 'b'), ('b', 'b'), [2, 2]) (('a', 'b', 'c'), ('b', 'c'), [1, 1]) (('a', 'c', 'a'), ('c', 'a'), [1, 2]) (('a', 'c', 'b'), ('c', 'b'), [1, 1]) (('a', 'c', 'c'), ('c', 'c'), [2, 2]) CPU times: user 16 ms, sys: 4 ms, total: 20 ms Wall time: 32.8 ms