For this SO question
import time
import numpy as np
from IPython.zmq.serialize import unpack_apply_message
from IPython import parallel
norm = np.linalg.norm
rc0 = parallel.Client()
loader = rc0.load_balanced_view()
for i in range(5):
loader.apply_async(time.sleep, i)
A = np.random.random((2,2))
for n in [1,2,np.inf]:
loader.apply_async(norm, A, ord=n)
msg_ids = rc0.history
rc = parallel.Client()
load the serialized requests from the Hub:
query = rc.db_query({'msg_id' : {'$in' : msg_ids } }, keys=['msg_id', 'buffers'])
Each entry is a dict with msg_id and buffers. The buffers are the serialized function, args, and kwargs.
query[0]
{'buffers': ['\x80\x02ctime\nsleep\nq\x01.', '\x80\x02}q\x01(U\x07kw_keysq\x02]q\x03U\x05nargsq\x04K\x01U\tnarg_bufsq\x05K\x01u.', '\x80\x02K\x03.'], u'msg_id': u'b3473d39-1a64-43d3-869c-af44c42e11c1'}
using unpack_apply_message
, we can reconstruct the function, args, and kwargs
of each request.
Keyed by the function, we create mappings of msg_id to the relevant args (order for the norms, and sleep time for sleeps).
sleeps = {}
norms = {}
# key for comparing unpacked fuction with the original
func_name = lambda f: f.__module__ + '.' + f.__name__
for q in query:
msg_id = q['msg_id']
f, args, kwargs = unpack_apply_message(q['buffers'])
if func_name(f) == func_name(time.sleep):
# we know sleeps only get one arg
sleeps[msg_id] = args[0]
elif func_name(f) == func_name(norm):
# we don't care about the array, only the order
norms[msg_id] = kwargs['ord']
print msg_id
print ' f=%r' % f
print ' args=%s' % args
print ' kwargs=%s' % kwargs
b3473d39-1a64-43d3-869c-af44c42e11c1 f=<built-in function sleep> args=3 kwargs={} 25de473a-5bd2-4571-89c5-45145bd15a40 f=<built-in function sleep> args=4 kwargs={} 4620c394-7b77-470b-bd5a-3bbf92327a31 f=<built-in function sleep> args=1 kwargs={} 21aec8d5-458f-43df-b864-d98fa6f2def8 f=<function norm at 0x10d4905f0> args=[[ 0.7648452 0.93428146] [ 0.82959873 0.92163395]] kwargs={'ord': 1} d2ca049f-465f-4b87-ba7e-d56d391273a2 f=<built-in function sleep> args=0 kwargs={} 287b0351-21d7-4443-a349-fc38a27baf89 f=<function norm at 0x10d4905f0> args=[[ 0.7648452 0.93428146] [ 0.82959873 0.92163395]] kwargs={'ord': 2} 7e6b3b43-2f17-449a-be12-e925c10e07d3 f=<function norm at 0x10d351c80> args=[[ 0.7648452 0.93428146] [ 0.82959873 0.92163395]] kwargs={'ord': inf} 27e27a67-889e-4045-a3ad-3d2940228040 f=<built-in function sleep> args=2 kwargs={}
sleeps
{u'25de473a-5bd2-4571-89c5-45145bd15a40': 4, u'27e27a67-889e-4045-a3ad-3d2940228040': 2, u'4620c394-7b77-470b-bd5a-3bbf92327a31': 1, u'b3473d39-1a64-43d3-869c-af44c42e11c1': 3, u'd2ca049f-465f-4b87-ba7e-d56d391273a2': 0}
norms
{u'21aec8d5-458f-43df-b864-d98fa6f2def8': 1, u'287b0351-21d7-4443-a349-fc38a27baf89': 2, u'7e6b3b43-2f17-449a-be12-e925c10e07d3': inf}