import numpy as np
import numexpr
import sys
import ast
class GetVars(ast.NodeTransformer):
def __init__(self):
self.vars = set()
def visit_Name(self, node):
self.vars.add(node.id)
return node
def blocked(op, local_dict=None):
call_frame = sys._getframe(1)
if local_dict is None:
local_dict = call_frame.f_locals
global_dict = call_frame.f_globals
a = ast.parse(op, mode='eval')
parser = GetVars()
parser.visit(a)
rargs = [local_dict[id] for id in parser.vars]
r = np.empty_like(rargs[0])
s = 2 * (64 * 1024) / r.itemsize
c = compile(a, '<string>', 'eval')
full = dict((id, local_dict[id]) for id in parser.vars)
for i in range(0, r.size, s):
u = min(r.size, i + s)
loc = {id : v[i:u] for id, v in full.items()}
r[i:u] = eval(c, global_dict, loc)
return r.reshape(rargs[0].shape)
def small(operation, r, full):
s = 2 * (64 * 1024) / r.itemsize
for i in range(0, r.size, s):
u = min(r.size, i + s)
loc = {id : v[i:u] for id, v in full.items()}
r[i:u] = eval(operation, loc)
def blocked_thread(op, local_dict=None, pool=None):
call_frame = sys._getframe(1)
if local_dict is None:
local_dict = call_frame.f_locals
global_dict = call_frame.f_globals
a = ast.parse(op, mode='eval')
parser = GetVars()
parser.visit(a)
rargs = [local_dict[id] for id in parser.vars]
r = np.empty_like(rargs[0])
c = compile(a, '<string>', 'eval')
full = dict((id, local_dict[id]) for id in parser.vars)
s = r.size // pool._processes
a = []
for i in range(0, r.size, s):
u = min(r.size, i + s)
loc = {id : v[i:u] for id, v in full.items()}
a.append(pool.apply_async(small, (c, r[i:u], loc)))
[x.get() for x in a]
return r.reshape(rargs[0].shape)
a = np.arange(1e7)
b = np.arange(1e7)
def test(a, b):
return blocked("a**2 + b**2 + a*b * 2")
print test(a, b)
[ 0.00000000e+00 4.00000000e+00 1.60000000e+01 ..., 3.99999760e+14 3.99999840e+14 3.99999920e+14]
from multiprocessing.pool import ThreadPool
t = ThreadPool()
s = "a**2 + b**2 + a*b * 2"
%%timeit
a**2 + b**2 + a*b * 2
1 loops, best of 3: 420 ms per loop
%%timeit
numexpr.set_num_threads(1)
numexpr.evaluate(s, local_dict={'a' : a, 'b' : b})
10 loops, best of 3: 69.9 ms per loop
%%timeit
blocked(s, local_dict={'a' : a, 'b' : b})
10 loops, best of 3: 98.3 ms per loop
%%timeit
blocked_thread(s, local_dict={'a' : a, 'b' : b}, pool=t)
1 loops, best of 3: 356 ms per loop
%%timeit
numexpr.set_num_threads(4)
numexpr.evaluate(s, local_dict={'a' : a, 'b' : b})
10 loops, best of 3: 25.1 ms per loop