IPython parallel computing clusters

In [1]:
import pandas as pd
import numpy as np
import os
from functools import partial

Create a fake dataset of SNV

In [2]:
CHROMOSOMES = list(range(1, 23)) + ['X', 'Y']
BASES = list('ACGT')
In [3]:
# Create a dataframe
df = pd.DataFrame({'Chromosome': np.random.choice(CHROMOSOMES, size=5000, replace=True, p=None),
                   'Position': np.random.randint(1000, 10000, size=5000),
                   'Reference': np.random.choice(BASES, size=5000, replace=True, p=None)})

df['Alternate'] = df.apply(lambda x: np.random.choice([i for i in BASES if i != x['Reference']]), axis=1)
df = df[['Chromosome', 'Position', 'Reference', 'Alternate']]
In [4]:
df.head(5)
Out[4]:
Chromosome Position Reference Alternate
0 18 3313 T C
1 22 3062 T C
2 11 2584 C A
3 6 2867 C A
4 20 8704 T C
In [5]:
df.shape
Out[5]:
(5000, 4)

Check if the mutations overlap by considering windows of 100bp

Non parallel version

In [6]:
def overlap(line, positions):
    return len([i for i in positions if line['Position'] - 100 <= i <= line['Position'] + 100])
In [7]:
%%time

grouped = df.groupby(['Chromosome'])
results = []
for count, group in grouped:    
    positions = group['Position'].tolist()
    group['Overlap'] = group.apply(partial(overlap, positions=positions), axis=1)
    results.append(group)
    
results = pd.concat(results)
results.reset_index()
CPU times: user 16.9 s, sys: 82.8 ms, total: 17 s
Wall time: 17.6 s
/Users/loris/.virtualenvs/python3_meetup/lib/python3.4/site-packages/IPython/kernel/__main__.py:6: SettingWithCopyWarning: 
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
In [8]:
results.head(5)
Out[8]:
Chromosome Position Reference Alternate Overlap
13 1 4864 G A 6
15 1 9839 G C 5
23 1 8972 T C 3
58 1 6162 A G 5
68 1 1365 T G 6
In [9]:
results.shape
Out[9]:
(5000, 5)

Parallel version

In [10]:
from IPython.parallel import Client
In [11]:
os.cpu_count()
Out[11]:
4
In [12]:
c = Client()
pool = c[:]
len(c.ids)
Out[12]:
4
In [13]:
%%px --local

from functools import partial


def overlap(line, positions):
    return len([i for i in positions if line['Position'] - 100 <= i <= line['Position'] + 100])


def parse_group(items):
    count, group = items
    positions = group['Position'].tolist()
    group['Overlap'] = group.apply(partial(overlap, positions=positions), axis=1)
    return group
In [14]:
%%time

grouped = df.groupby(['Chromosome'])
results_parallel = []
for result in pool.map(parse_group, grouped):
    results_parallel.append(result)
    
results_parallel = pd.concat(results_parallel)
results_parallel.reset_index()
CPU times: user 3.28 s, sys: 271 ms, total: 3.55 s
Wall time: 10.6 s
In [15]:
results_parallel.head(5)
Out[15]:
Chromosome Position Reference Alternate Overlap
13 1 4864 G A 6
15 1 9839 G C 5
23 1 8972 T C 3
58 1 6162 A G 5
68 1 1365 T G 6
In [16]:
results_parallel.shape
Out[16]:
(5000, 5)