This notebook is part of PyBroMo a python-based single-molecule Brownian motion diffusion simulator that simulates confocal smFRET experiments.
In this notebook we simulate a freely-diffusing smFRET experiment with dynamics between two states. The input are two smFRET files, one for each static state. These input files need to be simulations from the same particles trajectories but with different E*.
from pathlib import Path
from textwrap import dedent, indent
import numpy as np
import tables
from scipy.stats import expon
import phconvert as phc
print('phconvert version:', phc.__version__)
SIM_PATH = 'data/'
filelist = list(Path(SIM_PATH).glob('smFRET_*_600s.hdf5'))
[f.name for f in filelist]
filename_a = str([f for f in filelist if '11_E_40_Em' in f.name][0])
filename_b = str([f for f in filelist if '11_E_75_Em' in f.name][0])
filename_a, filename_b
da = tables.open_file(filename_a)
db = tables.open_file(filename_b)
da.filename
db.filename
print(da.root.description.read().decode())
print(db.root.description.read().decode())
# Make sure files a re using the same trajectories
assert da.root.description.read().decode().split('\n')[5] == db.root.description.read().decode().split('\n')[5]
# Timestamps
times_a = da.root.photon_data.timestamps.read()
times_b = db.root.photon_data.timestamps.read()
# Detectors
det_a = da.root.photon_data.detectors.read()
det_b = db.root.photon_data.detectors.read()
# Particle number for each timestamp
par_a = da.root.photon_data.particles.read()
par_b = db.root.photon_data.particles.read()
par_a
acquisition_duration = da.root.acquisition_duration.read()
assert acquisition_duration == db.root.acquisition_duration.read()
print('Acquisition duration: %d s' % acquisition_duration)
times_unit = da.root.photon_data.timestamps_specs.timestamps_unit.read()
times_unit
tau_a = 1e-3 / 5
tau_b = 0.5e-3 / 5
tau_s = [tau_a, tau_b]
expon_s = tuple(expon(scale=tau / times_unit) for tau in tau_s)
n = int(1.1 * acquisition_duration / (tau_a + tau_b)) # Number of transitions (upper limit)
def sim_two_states_single_particle(times_s, taus_s):
"""Simulate 2-state transitions for a single particle.
Arguments:
times_s (tuple or arrays): 2-tuple of timestamps arrays
for the two states a (times_s[0]) and b (times_s[1]).
taus_s (tuple or arrays): 2-tuple of residence times arrays
for the two states a (taus_s[0]) and b (taus_s[1]).
Returns:
List of index pairs. Each pair is a start/stop index for
for the timestamps of current state for a specific residence time.
The states are strictly alternating starting from 0 (i.e. a).
- first pair: (state = 0) start/stop index for array `times_s[0]` (where 0 = state)
corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[0][0]`
- second pair: (state = 1) start/stop index for array `times_s[1]` (where 1 = state)
corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[1][0]`
- third pair: (state = 0) start/stop index for array `times_s[0]` (where 0 = state)
corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[0][1]`
and so on.
"""
slices_list = []
index_s = [0, 0] # indexes for looping thorugh the timestamps arrays
index_start_s = [0, 0] # indexes of current state start in each timestamps array
index_tau_s = [0, 0] # index of current time window duration
t_start = 0 # time of current state start
state, otherstate = 0, 1
while ((index_s[0] < len(times_s[0]) - 1) and
(index_s[1] < len(times_s[1]) - 1)):
# Duration of current time window (i.e. duration of current state)
tau = taus_s[state][index_tau_s[state]]
# Find timestamps in current time window
# for both timestamps arrays
for state_i in (0, 1):
times = times_s[state_i]
delta_t = times[index_s[state_i]] - t_start
while delta_t < tau and index_s[state_i] < len(times) - 1:
index_s[state_i] += 1
delta_t = times[index_s[state_i]] - t_start
#print(state, index_s[state])
# Save the timestamps only for current state
slices_list.append((index_start_s[state], index_s[state]))
# Save index of first timestamp in next time window
index_start_s = index_s.copy()
# Discard current "used" tau
index_tau_s[state] += 1
# Switch to a new state
t_start += tau
state, otherstate = otherstate, state
return slices_list
def sim_two_states(num_particles, times_states, det_states, par_states, times_unit, expon_s, seed=1):
"""Simulate 2-state transitions for a set of particles.
Arguments:
num_particles (int): number of simulated particles.
times_states (tuple of arrays): 2-tuple of timestamps arrays, one for each state
det_states (tuple of arrays): 2-tuple of detectors arrays, one for each state
par_states (tuple of arrays): 2-tuple of particles arrays, one for each state
times_unit (float): timestamps unit in seconds.
expon_s (tuple of scipy.stats distributions): 2-tuple of exponential distributions
used to simulate residency times for each state. Each element is a frozen
`scipy.stats.expon` distribution with scale parameter set according to the
residency time for the corresponding state.
Returns:
Tuple of 2 lists:
- List of timestamps arrays, one for each particle, after 2-states dynamics simulation.
- List of detectors arrays, one for each particle, after 2-states dynamics simulation.
"""
np.random.seed(seed)
times_p = []
det_p = []
for particle in range(num_particles):
print('\n- Simulating particle %d: ' % particle, end='', flush=True)
# Get timestamps and detectors for current particle in each state
print('timestamps..', end='', flush=True)
masks_states = [par == particle for par in par_states]
times_s = [memoryview(t_par[mask_par]) for t_par, mask_par in zip(times_states, masks_states)]
det_s = [memoryview(det_par[mask_par]) for det_par, mask_par in zip(det_states, masks_states)]
print('[done] ', end='', flush=True)
# Simulate residence times
print('residence..', end='', flush=True)
taus_s = [memoryview(exp_dist.rvs(n)) for exp_dist in expon_s]
sim_duration = np.sum(np.sum(taus) for taus in taus_s) * times_unit
assert sim_duration > acquisition_duration
print('[done] ', end='', flush=True)
# Compute start/stop indexes for the timestamps for each residence time
print('transition-index..', end='', flush=True)
slices_list = sim_two_states_single_particle(times_s, taus_s)
print('[done] ', end='', flush=True)
# Create new timestamps and detectors to store dynamics simulation results
print('merge..', end='', flush=True)
times_size = sum([s[1] - s[0] for s in slices_list])
times = np.zeros(times_size, dtype='int64')
det = np.zeros(times_size, dtype='uint8')
par = np.ones(times_size, dtype='uint8') * particle
times_m = memoryview(times)
det_m = memoryview(det)
# istart, istop are indexes of times_m/det_m while the
# start, stop indexes in slices_list refer to `times_s[state]`
# where state = 0 for odd elements and state = 1 for even elements.
# See `sim_two_states_single_particle()` for more info on `slice_list`.
istart = 0
state, otherstate = 0, 1
for start, stop in slices_list:
istop = istart + stop - start
times_m[istart:istop] = times_s[state][start:stop]
det_m[istart:istop] = det_s[state][start:stop]
istart = istop
state, otherstate = otherstate, state
print('[done]', flush=True)
assert (times != 0).all()
times_p.append(times)
det_p.append(det)
return times_p, det_p
seed = 987123654 # random number generator seed
times_p, det_p = sim_two_states(35, (times_a, times_b), (det_a, det_b), (par_a, par_b),
times_unit=times_unit, expon_s=expon_s, seed=seed)
assert all(all(np.diff(t) >= 0) for t in times_p)
assert len(times_p) == len(det_p) == 35
det_p[0][:10]
det_p[1][:10]
times_p[0][:10]
times_p[1][:10]
Background is the same Poisson process in the two static files and it is saved as a virtual particle (index = 35, the "36th" virtual particle). Let's take the background from file a for simplicity.
times_a[par_a == 35]
det_a[par_a == 35]
times_p.append(times_a[par_a == 35])
det_p.append(det_a[par_a == 35])
times_dyn = np.hstack(times_p)
det_dyn = np.hstack(det_p)
argsort = times_dyn.argsort(kind='mergesort')
times_dyn = times_dyn[argsort]
det_dyn = det_dyn[argsort]
det_dyn
par_dyn = np.hstack([det_p_i.size * [idx] for idx, det_p_i in enumerate(det_p)])
assert par_dyn.shape[0] == sum(d.size for d in det_p)
par_dyn = par_dyn[argsort]
def make_photon_hdf5(times, det, par, times_unit, description, identity=None):
photon_data = dict(
timestamps = times,
timestamps_specs = dict(timestamps_unit=times_unit),
detectors = det,
particles = par,
measurement_specs = dict(
measurement_type = 'smFRET',
detectors_specs = dict(spectral_ch1 = np.atleast_1d(0),
spectral_ch2 = np.atleast_1d(1))))
setup = dict(
num_pixels = 2,
num_spots = 1,
num_spectral_ch = 2,
num_polarization_ch = 1,
num_split_ch = 1,
modulated_excitation = False,
lifetime = False,
excitation_alternated=(False,),
excitation_cw=(True,))
provenance = dict(software='', software_version='', filename='')
if identity is None:
identity = dict()
description = description
acquisition_duration = np.round((times[-1] - times[0]) * times_unit)
data = dict(
acquisition_duration = round(acquisition_duration),
description = description,
photon_data = photon_data,
setup=setup,
provenance=provenance,
identity=identity)
return data
da.filename
db.filename
traj_descr = dedent('\n'.join(da.root.description.read().decode().split('\n')[4:7]))
print(traj_descr)
part_D_descr = indent(dedent('\n'.join(da.root.description.read().decode().split('\n')[9:11])), ' ')
print(part_D_descr)
state0_descr = indent(dedent('\n'.join(da.root.description.read().decode().split('\n')[11:13])), ' ')
print(state0_descr)
state1_descr = indent(dedent('\n'.join(db.root.description.read().decode().split('\n')[11:13])), ' ')
print(state1_descr)
bg_descr = dedent('\n'.join(da.root.description.read().decode().split('\n')[-4:]))
print(bg_descr)
tau_a_ms = tau_a * 1e3
tau_b_ms = tau_b * 1e3
tau_a_us = tau_a * 1e6
tau_b_us = tau_b * 1e6
filename_a_name = Path(filename_a).name
filename_b_name = Path(filename_b).name
description = f"""\
PyBroMo simulation of 2-states dynamics
----------------------------------------
{traj_descr}
{part_D_descr}
State 0:
Residency time: {tau_a_ms} ms
{state0_descr}
filename: {filename_a_name}
State 1:
Residency time: {tau_b_ms} ms
{state0_descr}
filename: {filename_b_name}
{bg_descr}
"""
print(description)
identity=dict(author='Antonino Ingargiola',
author_affiliation='UCLA')
data = make_photon_hdf5(times_dyn, det_dyn, par_dyn, times_unit, description, identity=identity)
data
h5_fname = f'smFRET_0eb9b3_P_35_s0_D_2.5e-11_dynamics_E_40-75_Tau_{tau_a_us:.0f}-{tau_b_us:.0f}us_EmTot_226k-200k_BgD900_BgA600_t_max_600s.hdf5'
h5_fname
phc.hdf5.save_photon_hdf5(data, h5_fname=h5_fname, overwrite=True)