import numpy as np import numexpr import sys import ast class GetVars(ast.NodeTransformer): def __init__(self): self.vars = set() def visit_Name(self, node): self.vars.add(node.id) return node def blocked(op, local_dict=None): call_frame = sys._getframe(1) if local_dict is None: local_dict = call_frame.f_locals global_dict = call_frame.f_globals a = ast.parse(op, mode='eval') parser = GetVars() parser.visit(a) rargs = [local_dict[id] for id in parser.vars] r = np.empty_like(rargs[0]) s = 2 * (64 * 1024) / r.itemsize c = compile(a, '', 'eval') full = dict((id, local_dict[id]) for id in parser.vars) for i in range(0, r.size, s): u = min(r.size, i + s) loc = {id : v[i:u] for id, v in full.items()} r[i:u] = eval(c, global_dict, loc) return r.reshape(rargs[0].shape) def small(operation, r, full): s = 2 * (64 * 1024) / r.itemsize for i in range(0, r.size, s): u = min(r.size, i + s) loc = {id : v[i:u] for id, v in full.items()} r[i:u] = eval(operation, loc) def blocked_thread(op, local_dict=None, pool=None): call_frame = sys._getframe(1) if local_dict is None: local_dict = call_frame.f_locals global_dict = call_frame.f_globals a = ast.parse(op, mode='eval') parser = GetVars() parser.visit(a) rargs = [local_dict[id] for id in parser.vars] r = np.empty_like(rargs[0]) c = compile(a, '', 'eval') full = dict((id, local_dict[id]) for id in parser.vars) s = r.size // pool._processes a = [] for i in range(0, r.size, s): u = min(r.size, i + s) loc = {id : v[i:u] for id, v in full.items()} a.append(pool.apply_async(small, (c, r[i:u], loc))) [x.get() for x in a] return r.reshape(rargs[0].shape) a = np.arange(1e7) b = np.arange(1e7) def test(a, b): return blocked("a**2 + b**2 + a*b * 2") print test(a, b) from multiprocessing.pool import ThreadPool t = ThreadPool() s = "a**2 + b**2 + a*b * 2" %%timeit a**2 + b**2 + a*b * 2 %%timeit numexpr.set_num_threads(1) numexpr.evaluate(s, local_dict={'a' : a, 'b' : b}) %%timeit blocked(s, local_dict={'a' : a, 'b' : b}) %%timeit blocked_thread(s, local_dict={'a' : a, 'b' : b}, pool=t) %%timeit numexpr.set_num_threads(4) numexpr.evaluate(s, local_dict={'a' : a, 'b' : b})