For this SO question
import time
import numpy as np
from IPython.zmq.serialize import unpack_apply_message
from IPython import parallel
norm = np.linalg.norm
rc0 = parallel.Client()
loader = rc0.load_balanced_view()
for i in range(5):
loader.apply_async(time.sleep, i)
A = np.random.random((2,2))
for n in [1,2,np.inf]:
loader.apply_async(norm, A, ord=n)
msg_ids = rc0.history
rc = parallel.Client()
load the serialized requests from the Hub:
query = rc.db_query({'msg_id' : {'$in' : msg_ids } }, keys=['msg_id', 'buffers'])
Each entry is a dict with msg_id and buffers. The buffers are the serialized function, args, and kwargs.
query[0]
using unpack_apply_message, we can reconstruct the function, args, and kwargs
of each request.
Keyed by the function, we create mappings of msg_id to the relevant args (order for the norms, and sleep time for sleeps).
sleeps = {}
norms = {}
# key for comparing unpacked fuction with the original
func_name = lambda f: f.__module__ + '.' + f.__name__
for q in query:
msg_id = q['msg_id']
f, args, kwargs = unpack_apply_message(q['buffers'])
if func_name(f) == func_name(time.sleep):
# we know sleeps only get one arg
sleeps[msg_id] = args[0]
elif func_name(f) == func_name(norm):
# we don't care about the array, only the order
norms[msg_id] = kwargs['ord']
print msg_id
print ' f=%r' % f
print ' args=%s' % args
print ' kwargs=%s' % kwargs
sleeps
norms