Some tools for restricting IPython.parallel tasks to particular engines based on available resources

In [1]:
import multiprocessing
import socket
In [2]:
def minimum_mem(limit):
    """limit based on total system memory"""
    import sys
    if sys.platform == 'darwin': # or BSD in general?
        from subprocess import check_output
        n = int(check_output(['sysctl', '-n', 'hw.memsize']))
    else: # linux
        with open("/proc/meminfo") as f:
            for line in f:
                if line.startswith("MemTotal"):
                    n = 1024 * int(line.split()[1])
                    break
    return n >= limit

kB = 1024.
MB = 1024 * kB
GB = 1024 * MB
In [3]:
def get_mem():
    import sys
    if sys.platform == 'darwin': # or BSD in general?
        from subprocess import check_output
        n = int(check_output(['sysctl', '-n', 'hw.memsize']))
    else: # linux
        with open("/proc/meminfo") as f:
            for line in f:
                if line.startswith("MemTotal"):
                    n = 1024 * int(line.split()[1])
                    break
    return n
In [4]:
def minimum_cpu(limit):
    """limit based on total CPUs"""
    import multiprocessing
    return multiprocessing.cpu_count() >= limit
In [5]:
from IPython import parallel
rc = parallel.Client(profile='sync')
dv = rc[:]
view = rc.load_balanced_view()
In [6]:
hostnames = dv.apply_async(socket.gethostname).get_dict()
hostnames
Out[6]:
{0: 'tom', 1: 'tom', 2: 'edison', 3: 'zino'}
In [7]:
meminfo = dv.apply_async(get_mem).get_dict()
meminfo
Out[7]:
{0: 4157755392, 1: 4157755392, 2: 12592361472, 3: 1835565056}
In [8]:
ncpus = dv.apply_async(multiprocessing.cpu_count).get_dict()
ncpus
Out[8]:
{0: 4, 1: 4, 2: 8, 3: 1}
In [9]:
for eid in rc.ids:
    print "Engine %i [%6s]: %4.1fGB RAM, %2i CPUs" % (eid, hostnames[eid], meminfo[eid] / GB, ncpus[eid])
Engine 0 [   tom]:  3.9GB RAM,  4 CPUs
Engine 1 [   tom]:  3.9GB RAM,  4 CPUs
Engine 2 [edison]: 11.7GB RAM,  8 CPUs
Engine 3 [  zino]:  1.7GB RAM,  1 CPUs
In [10]:
@parallel.depend(minimum_mem, 8 * GB)
def big_mem_task(n):
    import os, socket
    return "big", socket.gethostname(), os.getpid(), n

Only one engine has at least 8 GB of ram, so all of these tasks will be assigned there

In [11]:
amr = view.map_async(big_mem_task, range(10))
for r in amr:
    for item in r:
        print item,
    print
big edison 11479 0
big edison 11479 1
big edison 11479 2
big edison 11479 3
big edison 11479 4
big edison 11479 5
big edison 11479 6
big edison 11479 7
big edison 11479 8
big edison 11479 9

Both tom and edison have at least 3GB of RAM, so these tasks will be assigned to those, but not zino

In [12]:
@parallel.depend(minimum_mem, 3 * GB)
def medium_mem_task(n):
    import os, socket
    return "medium", socket.gethostname(), os.getpid(), n
In [13]:
amr = view.map_async(medium_mem_task, range(10))
for r in amr:
    for item in r:
        print item,
    print
        
medium tom 6609 0
medium tom 6609 1
medium tom 6610 2
medium edison 11479 3
medium tom 6609 4
medium tom 6610 5
medium edison 11479 6
medium tom 6609 7
medium tom 6610 8
medium edison 11479 9

Again, both tom and edison have at least 4 CPUs required to run our 'big cpu task'

In [14]:
@parallel.depend(minimum_cpu, 4)
def big_cpu_task(n):
    import os, socket
    return socket.gethostname(), os.getpid(), n
In [15]:
amr = view.map_async(big_cpu_task, range(10))
for r in amr:
    for item in r:
        print item,
    print
        
tom 6610 0
tom 6610 1
edison 11479 2
tom 6609 3
tom 6610 4
edison 11479 5
tom 6609 6
tom 6610 7
edison 11479 8
tom 6609 9

Poor zino, never gets to do anything because he's too small.