#!/usr/bin/env python # coding: utf-8 # # IPython parallel computing clusters # In[1]: import pandas as pd import numpy as np import os from functools import partial # ## Create a fake dataset of SNV # In[2]: CHROMOSOMES = list(range(1, 23)) + ['X', 'Y'] BASES = list('ACGT') # In[3]: # Create a dataframe df = pd.DataFrame({'Chromosome': np.random.choice(CHROMOSOMES, size=5000, replace=True, p=None), 'Position': np.random.randint(1000, 10000, size=5000), 'Reference': np.random.choice(BASES, size=5000, replace=True, p=None)}) df['Alternate'] = df.apply(lambda x: np.random.choice([i for i in BASES if i != x['Reference']]), axis=1) df = df[['Chromosome', 'Position', 'Reference', 'Alternate']] # In[4]: df.head(5) # In[5]: df.shape # ## Check if the mutations overlap by considering windows of 100bp # ### Non parallel version # In[6]: def overlap(line, positions): return len([i for i in positions if line['Position'] - 100 <= i <= line['Position'] + 100]) # In[7]: get_ipython().run_cell_magic('time', '', "\ngrouped = df.groupby(['Chromosome'])\nresults = []\nfor count, group in grouped: \n positions = group['Position'].tolist()\n group['Overlap'] = group.apply(partial(overlap, positions=positions), axis=1)\n results.append(group)\n \nresults = pd.concat(results)\nresults.reset_index()\n") # In[8]: results.head(5) # In[9]: results.shape # ### Parallel version # In[10]: from IPython.parallel import Client # In[11]: os.cpu_count() # In[12]: c = Client() pool = c[:] len(c.ids) # In[13]: get_ipython().run_cell_magic('px', '--local', "\nfrom functools import partial\n\n\ndef overlap(line, positions):\n return len([i for i in positions if line['Position'] - 100 <= i <= line['Position'] + 100])\n\n\ndef parse_group(items):\n count, group = items\n positions = group['Position'].tolist()\n group['Overlap'] = group.apply(partial(overlap, positions=positions), axis=1)\n return group\n") # In[14]: get_ipython().run_cell_magic('time', '', "\ngrouped = df.groupby(['Chromosome'])\nresults_parallel = []\nfor result in pool.map(parse_group, grouped):\n results_parallel.append(result)\n \nresults_parallel = pd.concat(results_parallel)\nresults_parallel.reset_index()\n") # In[15]: results_parallel.head(5) # In[16]: results_parallel.shape