import multiprocessing import socket def minimum_mem(limit): """limit based on total system memory""" import sys if sys.platform == 'darwin': # or BSD in general? from subprocess import check_output n = int(check_output(['sysctl', '-n', 'hw.memsize'])) else: # linux with open("/proc/meminfo") as f: for line in f: if line.startswith("MemTotal"): n = 1024 * int(line.split()[1]) break return n >= limit kB = 1024. MB = 1024 * kB GB = 1024 * MB def get_mem(): import sys if sys.platform == 'darwin': # or BSD in general? from subprocess import check_output n = int(check_output(['sysctl', '-n', 'hw.memsize'])) else: # linux with open("/proc/meminfo") as f: for line in f: if line.startswith("MemTotal"): n = 1024 * int(line.split()[1]) break return n def minimum_cpu(limit): """limit based on total CPUs""" import multiprocessing return multiprocessing.cpu_count() >= limit from IPython import parallel rc = parallel.Client(profile='sync') dv = rc[:] view = rc.load_balanced_view() hostnames = dv.apply_async(socket.gethostname).get_dict() hostnames meminfo = dv.apply_async(get_mem).get_dict() meminfo ncpus = dv.apply_async(multiprocessing.cpu_count).get_dict() ncpus for eid in rc.ids: print "Engine %i [%6s]: %4.1fGB RAM, %2i CPUs" % (eid, hostnames[eid], meminfo[eid] / GB, ncpus[eid]) @parallel.depend(minimum_mem, 8 * GB) def big_mem_task(n): import os, socket return "big", socket.gethostname(), os.getpid(), n amr = view.map_async(big_mem_task, range(10)) for r in amr: for item in r: print item, print @parallel.depend(minimum_mem, 3 * GB) def medium_mem_task(n): import os, socket return "medium", socket.gethostname(), os.getpid(), n amr = view.map_async(medium_mem_task, range(10)) for r in amr: for item in r: print item, print @parallel.depend(minimum_cpu, 4) def big_cpu_task(n): import os, socket return socket.gethostname(), os.getpid(), n amr = view.map_async(big_cpu_task, range(10)) for r in amr: for item in r: print item, print