import numa from IPython.parallel import Client client = Client() len(client) !echo 3 | sudo tee /proc/sys/vm/drop_caches for node_id in range(numa.get_max_node() + 1): print(numa.get_node_size(node_id)) import numpy as np from os.path import exists, join filename = 'a.mmap' mode = 'r+' if exists(filename) else 'w+' a = np.memmap(filename, shape=(int(8e6), 784), dtype=np.float32, mode=mode) n_jobs = 60 a.nbytes def do_stuff(params): seed, filename, n_iter, numa_aware = params import numpy as np import os n_samples, n_features = int(8e6), 784 rng = np.random.RandomState(seed) if numa_aware: # Create hard link for the current process numa node import numa effective_filename = "%s_%d" % (filename, numa.get_preferred()) if not os.path.exists(effective_filename): os.link(filename, effective_filename) else: effective_filename = filename data = np.memmap(effective_filename, shape=(n_samples, n_features), dtype=np.float32, mode='r') # Trigger one sequential scan of the whole readonly data data.max() # Trigger n_iter random access to chunks of data for i in range(n_iter): idx = rng.random_integers(low=0, high=n_samples - 1, size=1000) np.mean(data[idx]) return effective_filename workers = client.load_balanced_view() %%px import numa print(numa.get_preferred(), numa.get_affinity(0)) %time workers.map(do_stuff, [(i, filename, 50, False) for i in range(n_jobs)]).get() def assign_cpu_numa_node(engine_id, fix_cpu=False): import numa n_nodes = numa.get_max_node() + 1 local_engine_id, node_id = divmod(engine_id, n_nodes) # Assing current process to a fixed numa node numa.set_preferred(node_id) cpu_ids = list(sorted(numa.node_to_cpus(node_id))) if fix_cpu: # Fix engine to one specific CPU that is bound with node_id cpu_id = cpu_ids[local_engine_id % len(cpu_ids)] numa.set_affinity(0, {cpu_id}) else: # Set affinity of current process to any of the CPUs bound # with node_id numa.set_affinity(0, cpu_ids) for engine_id in client.ids: client[engine_id].apply(assign_cpu_numa_node, engine_id, fix_cpu=False) %%px import numa print(numa.get_preferred(), numa.get_affinity(0)) !echo 3 | sudo tee /proc/sys/vm/drop_caches %time workers.map(do_stuff, [(i, filename, 50, True) for i in range(n_jobs)]).get() for engine_id in client.ids: client[engine_id].apply(assign_cpu_numa_node, engine_id, fix_cpu=True) %%px import numa print(numa.get_preferred(), numa.get_affinity(0)) !echo 3 | sudo tee /proc/sys/vm/drop_caches %time workers.map(do_stuff, [(i, filename, 50, True) for i in range(n_jobs)]).get() !echo 3 | sudo tee /proc/sys/vm/drop_caches %time workers.map(do_stuff, [(i, filename, 50, False) for i in range(n_jobs)]).get()