Reconstructing IPython.parallel requests

In [2]:
import time
import numpy as np

from IPython.zmq.serialize import unpack_apply_message
from IPython import parallel
In [3]:
norm = np.linalg.norm

Preamble: load the hub with some data

In [18]:
rc0 = parallel.Client()
loader = rc0.load_balanced_view()
In [19]:
for i in range(5):
    loader.apply_async(time.sleep, i)
In [20]:
A = np.random.random((2,2))
for n in [1,2,np.inf]:
    loader.apply_async(norm, A, ord=n)
In [22]:
msg_ids = rc0.history

Now we can start to reconstruct the requests

In [25]:
rc = parallel.Client()

load the serialized requests from the Hub:

In [26]:
query = rc.db_query({'msg_id' : {'$in' : msg_ids } }, keys=['msg_id', 'buffers'])

Each entry is a dict with msg_id and buffers. The buffers are the serialized function, args, and kwargs.

In [28]:
query[0]
Out[28]:
{'buffers': ['\x80\x02ctime\nsleep\nq\x01.',
  '\x80\x02}q\x01(U\x07kw_keysq\x02]q\x03U\x05nargsq\x04K\x01U\tnarg_bufsq\x05K\x01u.',
  '\x80\x02K\x03.'],
 u'msg_id': u'b3473d39-1a64-43d3-869c-af44c42e11c1'}

using unpack_apply_message, we can reconstruct the function, args, and kwargs of each request.

Keyed by the function, we create mappings of msg_id to the relevant args (order for the norms, and sleep time for sleeps).

In [52]:
sleeps = {}
norms = {}

# key for comparing unpacked fuction with the original
func_name = lambda f: f.__module__ + '.' + f.__name__

for q in query:
    msg_id = q['msg_id']
    f, args, kwargs = unpack_apply_message(q['buffers'])
    if func_name(f) == func_name(time.sleep):
        # we know sleeps only get one arg
        sleeps[msg_id] = args[0]
    elif func_name(f) == func_name(norm):
        # we don't care about the array, only the order
        norms[msg_id] = kwargs['ord']
    print msg_id
    print '       f=%r' % f
    print '    args=%s' % args
    print '  kwargs=%s' % kwargs
b3473d39-1a64-43d3-869c-af44c42e11c1
       f=<built-in function sleep>
    args=3
  kwargs={}
25de473a-5bd2-4571-89c5-45145bd15a40
       f=<built-in function sleep>
    args=4
  kwargs={}
4620c394-7b77-470b-bd5a-3bbf92327a31
       f=<built-in function sleep>
    args=1
  kwargs={}
21aec8d5-458f-43df-b864-d98fa6f2def8
       f=<function norm at 0x10d4905f0>
    args=[[ 0.7648452   0.93428146]
 [ 0.82959873  0.92163395]]
  kwargs={'ord': 1}
d2ca049f-465f-4b87-ba7e-d56d391273a2
       f=<built-in function sleep>
    args=0
  kwargs={}
287b0351-21d7-4443-a349-fc38a27baf89
       f=<function norm at 0x10d4905f0>
    args=[[ 0.7648452   0.93428146]
 [ 0.82959873  0.92163395]]
  kwargs={'ord': 2}
7e6b3b43-2f17-449a-be12-e925c10e07d3
       f=<function norm at 0x10d351c80>
    args=[[ 0.7648452   0.93428146]
 [ 0.82959873  0.92163395]]
  kwargs={'ord': inf}
27e27a67-889e-4045-a3ad-3d2940228040
       f=<built-in function sleep>
    args=2
  kwargs={}

In [53]:
sleeps
Out[53]:
{u'25de473a-5bd2-4571-89c5-45145bd15a40': 4,
 u'27e27a67-889e-4045-a3ad-3d2940228040': 2,
 u'4620c394-7b77-470b-bd5a-3bbf92327a31': 1,
 u'b3473d39-1a64-43d3-869c-af44c42e11c1': 3,
 u'd2ca049f-465f-4b87-ba7e-d56d391273a2': 0}
In [54]:
norms
Out[54]:
{u'21aec8d5-458f-43df-b864-d98fa6f2def8': 1,
 u'287b0351-21d7-4443-a349-fc38a27baf89': 2,
 u'7e6b3b43-2f17-449a-be12-e925c10e07d3': inf}