#!/usr/bin/env python # coding: utf-8 # # [PyBroMo](http://tritemio.github.io/PyBroMo/) 5. Two-state dynamics - Dynamic smFRET simulation # # This notebook is part of PyBroMo a # python-based single-molecule Brownian motion diffusion simulator # that simulates confocal smFRET # experiments. # # ## *Overview* # # *In this notebook we simulate a freely-diffusing smFRET experiment with dynamics between two states.* # *The input are two smFRET files, one for each static state.* # *These input files need to be simulations from the same particles trajectories # *but with different E*. # # Load static FRET data # In[ ]: from pathlib import Path from textwrap import dedent, indent import numpy as np import tables from scipy.stats import expon import phconvert as phc print('phconvert version:', phc.__version__) # In[ ]: SIM_PATH = 'data/' # In[ ]: filelist = list(Path(SIM_PATH).glob('smFRET_*_600s.hdf5')) [f.name for f in filelist] # In[ ]: filename_a = str([f for f in filelist if '11_E_40_Em' in f.name][0]) filename_b = str([f for f in filelist if '11_E_75_Em' in f.name][0]) filename_a, filename_b # In[ ]: da = tables.open_file(filename_a) db = tables.open_file(filename_b) # In[ ]: da.filename # In[ ]: db.filename # In[ ]: print(da.root.description.read().decode()) # In[ ]: print(db.root.description.read().decode()) # In[ ]: # Make sure files a re using the same trajectories assert da.root.description.read().decode().split('\n')[5] == db.root.description.read().decode().split('\n')[5] # ## Timestamps, detectors and particles for the two states # In[ ]: # Timestamps times_a = da.root.photon_data.timestamps.read() times_b = db.root.photon_data.timestamps.read() # Detectors det_a = da.root.photon_data.detectors.read() det_b = db.root.photon_data.detectors.read() # Particle number for each timestamp par_a = da.root.photon_data.particles.read() par_b = db.root.photon_data.particles.read() # In[ ]: par_a # In[ ]: acquisition_duration = da.root.acquisition_duration.read() assert acquisition_duration == db.root.acquisition_duration.read() print('Acquisition duration: %d s' % acquisition_duration) # In[ ]: times_unit = da.root.photon_data.timestamps_specs.timestamps_unit.read() times_unit # # Simulation # ## Mean residence times for the two states a and b # In[ ]: tau_a = 1e-3 / 5 tau_b = 0.5e-3 / 5 tau_s = [tau_a, tau_b] # ## Exponential distributions of residence times for the two states # In[ ]: expon_s = tuple(expon(scale=tau / times_unit) for tau in tau_s) # In[ ]: n = int(1.1 * acquisition_duration / (tau_a + tau_b)) # Number of transitions (upper limit) # ## Define functions # In[ ]: def sim_two_states_single_particle(times_s, taus_s): """Simulate 2-state transitions for a single particle. Arguments: times_s (tuple or arrays): 2-tuple of timestamps arrays for the two states a (times_s[0]) and b (times_s[1]). taus_s (tuple or arrays): 2-tuple of residence times arrays for the two states a (taus_s[0]) and b (taus_s[1]). Returns: List of index pairs. Each pair is a start/stop index for for the timestamps of current state for a specific residence time. The states are strictly alternating starting from 0 (i.e. a). - first pair: (state = 0) start/stop index for array `times_s[0]` (where 0 = state) corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[0][0]` - second pair: (state = 1) start/stop index for array `times_s[1]` (where 1 = state) corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[1][0]` - third pair: (state = 0) start/stop index for array `times_s[0]` (where 0 = state) corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[0][1]` and so on. """ slices_list = [] index_s = [0, 0] # indexes for looping thorugh the timestamps arrays index_start_s = [0, 0] # indexes of current state start in each timestamps array index_tau_s = [0, 0] # index of current time window duration t_start = 0 # time of current state start state, otherstate = 0, 1 while ((index_s[0] < len(times_s[0]) - 1) and (index_s[1] < len(times_s[1]) - 1)): # Duration of current time window (i.e. duration of current state) tau = taus_s[state][index_tau_s[state]] # Find timestamps in current time window # for both timestamps arrays for state_i in (0, 1): times = times_s[state_i] delta_t = times[index_s[state_i]] - t_start while delta_t < tau and index_s[state_i] < len(times) - 1: index_s[state_i] += 1 delta_t = times[index_s[state_i]] - t_start #print(state, index_s[state]) # Save the timestamps only for current state slices_list.append((index_start_s[state], index_s[state])) # Save index of first timestamp in next time window index_start_s = index_s.copy() # Discard current "used" tau index_tau_s[state] += 1 # Switch to a new state t_start += tau state, otherstate = otherstate, state return slices_list # In[ ]: def sim_two_states(num_particles, times_states, det_states, par_states, times_unit, expon_s, seed=1): """Simulate 2-state transitions for a set of particles. Arguments: num_particles (int): number of simulated particles. times_states (tuple of arrays): 2-tuple of timestamps arrays, one for each state det_states (tuple of arrays): 2-tuple of detectors arrays, one for each state par_states (tuple of arrays): 2-tuple of particles arrays, one for each state times_unit (float): timestamps unit in seconds. expon_s (tuple of scipy.stats distributions): 2-tuple of exponential distributions used to simulate residency times for each state. Each element is a frozen `scipy.stats.expon` distribution with scale parameter set according to the residency time for the corresponding state. Returns: Tuple of 2 lists: - List of timestamps arrays, one for each particle, after 2-states dynamics simulation. - List of detectors arrays, one for each particle, after 2-states dynamics simulation. """ np.random.seed(seed) times_p = [] det_p = [] for particle in range(num_particles): print('\n- Simulating particle %d: ' % particle, end='', flush=True) # Get timestamps and detectors for current particle in each state print('timestamps..', end='', flush=True) masks_states = [par == particle for par in par_states] times_s = [memoryview(t_par[mask_par]) for t_par, mask_par in zip(times_states, masks_states)] det_s = [memoryview(det_par[mask_par]) for det_par, mask_par in zip(det_states, masks_states)] print('[done] ', end='', flush=True) # Simulate residence times print('residence..', end='', flush=True) taus_s = [memoryview(exp_dist.rvs(n)) for exp_dist in expon_s] sim_duration = np.sum(np.sum(taus) for taus in taus_s) * times_unit assert sim_duration > acquisition_duration print('[done] ', end='', flush=True) # Compute start/stop indexes for the timestamps for each residence time print('transition-index..', end='', flush=True) slices_list = sim_two_states_single_particle(times_s, taus_s) print('[done] ', end='', flush=True) # Create new timestamps and detectors to store dynamics simulation results print('merge..', end='', flush=True) times_size = sum([s[1] - s[0] for s in slices_list]) times = np.zeros(times_size, dtype='int64') det = np.zeros(times_size, dtype='uint8') par = np.ones(times_size, dtype='uint8') * particle times_m = memoryview(times) det_m = memoryview(det) # istart, istop are indexes of times_m/det_m while the # start, stop indexes in slices_list refer to `times_s[state]` # where state = 0 for odd elements and state = 1 for even elements. # See `sim_two_states_single_particle()` for more info on `slice_list`. istart = 0 state, otherstate = 0, 1 for start, stop in slices_list: istop = istart + stop - start times_m[istart:istop] = times_s[state][start:stop] det_m[istart:istop] = det_s[state][start:stop] istart = istop state, otherstate = otherstate, state print('[done]', flush=True) assert (times != 0).all() times_p.append(times) det_p.append(det) return times_p, det_p # ## Simulate dynamics # In[ ]: seed = 987123654 # random number generator seed times_p, det_p = sim_two_states(35, (times_a, times_b), (det_a, det_b), (par_a, par_b), times_unit=times_unit, expon_s=expon_s, seed=seed) # In[ ]: assert all(all(np.diff(t) >= 0) for t in times_p) assert len(times_p) == len(det_p) == 35 # In[ ]: det_p[0][:10] # In[ ]: det_p[1][:10] # In[ ]: times_p[0][:10] # In[ ]: times_p[1][:10] # ### Add background # # Background is the same Poisson process in the two static files # and it is saved as a virtual particle (index = 35, the "36th" virtual particle). # Let's take the background from file **a** for simplicity. # In[ ]: times_a[par_a == 35] # In[ ]: det_a[par_a == 35] # In[ ]: times_p.append(times_a[par_a == 35]) det_p.append(det_a[par_a == 35]) # ### Merge arrays from individual particles # In[ ]: times_dyn = np.hstack(times_p) det_dyn = np.hstack(det_p) # In[ ]: argsort = times_dyn.argsort(kind='mergesort') times_dyn = times_dyn[argsort] det_dyn = det_dyn[argsort] det_dyn # In[ ]: par_dyn = np.hstack([det_p_i.size * [idx] for idx, det_p_i in enumerate(det_p)]) assert par_dyn.shape[0] == sum(d.size for d in det_p) par_dyn = par_dyn[argsort] # # Save data to Photon-HDF5 # In[ ]: def make_photon_hdf5(times, det, par, times_unit, description, identity=None): photon_data = dict( timestamps = times, timestamps_specs = dict(timestamps_unit=times_unit), detectors = det, particles = par, measurement_specs = dict( measurement_type = 'smFRET', detectors_specs = dict(spectral_ch1 = np.atleast_1d(0), spectral_ch2 = np.atleast_1d(1)))) setup = dict( num_pixels = 2, num_spots = 1, num_spectral_ch = 2, num_polarization_ch = 1, num_split_ch = 1, modulated_excitation = False, lifetime = False, excitation_alternated=(False,), excitation_cw=(True,)) provenance = dict(software='', software_version='', filename='') if identity is None: identity = dict() description = description acquisition_duration = np.round((times[-1] - times[0]) * times_unit) data = dict( acquisition_duration = round(acquisition_duration), description = description, photon_data = photon_data, setup=setup, provenance=provenance, identity=identity) return data # ## Create description string # In[ ]: da.filename # In[ ]: db.filename # In[ ]: traj_descr = dedent('\n'.join(da.root.description.read().decode().split('\n')[4:7])) print(traj_descr) # In[ ]: part_D_descr = indent(dedent('\n'.join(da.root.description.read().decode().split('\n')[9:11])), ' ') print(part_D_descr) # In[ ]: state0_descr = indent(dedent('\n'.join(da.root.description.read().decode().split('\n')[11:13])), ' ') print(state0_descr) # In[ ]: state1_descr = indent(dedent('\n'.join(db.root.description.read().decode().split('\n')[11:13])), ' ') print(state1_descr) # In[ ]: bg_descr = dedent('\n'.join(da.root.description.read().decode().split('\n')[-4:])) print(bg_descr) # In[ ]: tau_a_ms = tau_a * 1e3 tau_b_ms = tau_b * 1e3 tau_a_us = tau_a * 1e6 tau_b_us = tau_b * 1e6 # In[ ]: filename_a_name = Path(filename_a).name filename_b_name = Path(filename_b).name # In[ ]: description = f"""\ PyBroMo simulation of 2-states dynamics ---------------------------------------- {traj_descr} {part_D_descr} State 0: Residency time: {tau_a_ms} ms {state0_descr} filename: {filename_a_name} State 1: Residency time: {tau_b_ms} ms {state0_descr} filename: {filename_b_name} {bg_descr} """ print(description) # ## Save file # In[ ]: identity=dict(author='Antonino Ingargiola', author_affiliation='UCLA') # In[ ]: data = make_photon_hdf5(times_dyn, det_dyn, par_dyn, times_unit, description, identity=identity) data # In[ ]: h5_fname = f'smFRET_0eb9b3_P_35_s0_D_2.5e-11_dynamics_E_40-75_Tau_{tau_a_us:.0f}-{tau_b_us:.0f}us_EmTot_226k-200k_BgD900_BgA600_t_max_600s.hdf5' h5_fname # In[ ]: phc.hdf5.save_photon_hdf5(data, h5_fname=h5_fname, overwrite=True)