This notebook benchmarks different solution to this Stackoverflow question.
from itertools import izip
import pandas as pd
import numpy as np
import tables as tb
print 'PyTables version:', tb.__version__
print 'Pandas version:', pd.__version__
PyTables version: 3.0.0 Pandas version: 0.12.0
n_particles = 15
time_chunk_size = 2**18
n_iter = 20
time_size = n_iter*time_chunk_size
print 'time size: %.1f 10^6' % (time_size*1e-6)
print 'emission size: %.1f MB' % (4*n_particles*time_size/1024./1024.)
time size: 5.2 10^6 emission size: 300.0 MB
def generate_emission(shape):
"""Generate fake emission."""
emission = np.random.randn(*shape).astype('float32') - 1
emission.clip(0, 1e6, out=emission)
assert (emission >=0).all()
return emission
def compute_counts(emission):
"""Fake counts computation"""
return np.trunc(emission*1.5).astype('i1')
emission
"¶Time needed to generate the data (in-RAM storage):
%%timeit -r1 -n1
# generate simulated data
test = []
for i in range(n_iter):
emission_chunk = generate_emission((time_chunk_size, n_particles))
test.append(emission_chunk)
1 loops, best of 1: 3.4 s per loop
Time needed to save the data to a file with pytables:
#data_file.close()
data_file = tb.open_file('emission_pytables.hdf', mode = "w")
%%timeit -r1 -n1
# 1) create a new emission file, compressing as we go
comp_filter = tb.Filters(complib='blosc', complevel=5)
emission = data_file.create_earray('/', 'emission', atom=tb.Float32Atom(),
shape = (n_particles, 0),
chunkshape = (n_particles, time_chunk_size),
expectedrows = n_iter*time_chunk_size,
filters = comp_filter)
# generate simulated emission data
for i in range(n_iter):
emission_chunk = generate_emission((n_particles, time_chunk_size))
emission.append(emission_chunk)
emission.flush()
1 loops, best of 1: 12.3 s per loop
print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.)
File size: 135.1 MB
emission = data_file.root.emission
print 'Compression ratio: %.1f x' % ((1.*emission.size_in_memory/emission.size_on_disk))
Compression ratio: 2.2 x
Time needed to save the data to a file with pandas (Jeff version):
%%timeit -r1 -n1
# 1) create a new emission file, compressing as we go
emission = pd.HDFStore('emission_pandas.hdf', mode='w', complib='blosc', complevel=5)
# generate simulated data
for i in range(n_iter):
# Generate fake emission
emission_chunk = np.random.randn(time_chunk_size, n_particles) - 1
emission_chunk.clip(0, 1e6, out=emission_chunk)
assert (emission_chunk >=0).all()
df = pd.DataFrame(emission_chunk, dtype='float32')
# create a globally unique index (time)
# http://stackoverflow.com/questions/16997048/how-does-one-append-large-
# amounts-of-data-to-a-pandas-hdfstore-and-get-a-natural/16999397#16999397
try:
nrows = emission.get_storer('df').nrows
except:
nrows = 0
df.index = pd.Series(df.index) + nrows
emission.append('df', df)
emission.close()
1 loops, best of 1: 41.7 s per loop
#emission.close()
timestamps
"¶Compute the counts:
counts_ram = compute_counts(emission.read())
%%timeit -r1 -n1
counts_ram = compute_counts(emission.read())
1 loops, best of 1: 31.2 s per loop
When there are more than 1 counts per bin I add a "fraction of bin" to the timestamp in order to avoid photon bunching. The timestamp resolution is increased 10 times so that integers can represent fractions of the initial time-bin.
Lists are defined outside the timeit loop otherwise are not saved:
fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9]
scale = 10
max_counts = 4
t_list = [[] for i in xrange(n_particles)]
timestamps_ram = []
%%timeit -r1 -n1
for p_i, counts_p_i in enumerate(counts_ram.copy()):
t_list[p_i].append(counts_p_i.nonzero()[0]*scale)
for frac, v in izip(fractions, range(2, max_counts + 1)):
counts_p_i -= 1
np.clip(counts_p_i, 0, 127, out=counts_p_i)
t_list[p_i].append(counts_p_i.nonzero()[0]*scale + frac)
for t in t_list: timestamps_ram.append(np.hstack(t))
[t.sort(kind='mergesort') for t in timestamps_ram]
1 loops, best of 1: 3.7 s per loop
Here we create a single sorted array (and an array of particles) from the list of timestamps
par_index = [p_i*np.ones(t.size) for p_i, t in enumerate(timestamps_ram)]
timestamps_all_ram = np.hstack(timestamps_ram)
par_index_all_ram = np.hstack(par_index)
index_sort = timestamps_all_ram.argsort(kind='mergesort')
timestamps_all_ram = timestamps_all_ram[index_sort]
par_index_all_ram = par_index_all_ram[index_sort]
This are only consistency checks:
print [t.shape for t in timestamps_ram]
[(310020,), (309226,), (308675,), (309585,), (310489,), (309850,), (310068,), (309547,), (310329,), (310280,), (309849,), (309377,), (309851,), (310024,), (308760,)]
for p_i in xrange(n_particles):
print (timestamps_ram[p_i] == timestamps_all_ram[par_index_all_ram == p_i]).all(),
True True True True True True True True True True True True True True True
idx_ram = [t/scale for t in timestamps_ram]
idx_ram
[array([ 30, 86, 126, ..., 5242816, 5242857, 5242879]), array([ 2, 36, 45, ..., 5242706, 5242726, 5242752]), array([ 36, 36, 57, ..., 5242852, 5242853, 5242853]), array([ 1, 1, 1, ..., 5242830, 5242842, 5242877]), array([ 21, 33, 33, ..., 5242694, 5242849, 5242849]), array([ 26, 26, 47, ..., 5242814, 5242835, 5242871]), array([ 34, 36, 52, ..., 5242852, 5242852, 5242852]), array([ 0, 10, 41, ..., 5242856, 5242879, 5242879]), array([ 5, 33, 70, ..., 5242859, 5242870, 5242879]), array([ 15, 17, 45, ..., 5242872, 5242872, 5242874]), array([ 12, 45, 50, ..., 5242875, 5242875, 5242876]), array([ 52, 52, 55, ..., 5242834, 5242834, 5242834]), array([ 20, 44, 53, ..., 5242693, 5242804, 5242835]), array([ 54, 61, 61, ..., 5242869, 5242876, 5242876]), array([ 30, 52, 52, ..., 5242837, 5242845, 5242845])]
print [(np.clip(c, 0, max_counts).sum() == t.size) for c, t in zip(counts_ram, timestamps_ram)]
[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]
[c[i] for c, i in zip(counts_ram, idx_ram)]
[array([1, 1, 1, ..., 1, 1, 1], dtype=int8), array([1, 1, 1, ..., 1, 1, 1], dtype=int8), array([2, 2, 1, ..., 1, 2, 2], dtype=int8), array([3, 3, 3, ..., 1, 1, 1], dtype=int8), array([1, 3, 3, ..., 1, 2, 2], dtype=int8), array([2, 2, 1, ..., 1, 1, 1], dtype=int8), array([1, 1, 2, ..., 3, 3, 3], dtype=int8), array([1, 1, 1, ..., 1, 2, 2], dtype=int8), array([1, 1, 1, ..., 1, 1, 1], dtype=int8), array([1, 1, 1, ..., 2, 2, 1], dtype=int8), array([1, 1, 1, ..., 2, 2, 1], dtype=int8), array([2, 2, 1, ..., 3, 3, 3], dtype=int8), array([1, 1, 1, ..., 1, 1, 1], dtype=int8), array([1, 2, 2, ..., 1, 2, 2], dtype=int8), array([1, 2, 2, ..., 1, 2, 2], dtype=int8)]
counts
, then compute timestamps
¶Each particle has a different table of "counts".
Let create the tables on disk:
counts_names = ["counts_p%d" % i for i in xrange(n_particles)]
#for i_p in xrange(n_particles): data_file.remove_node('/c', counts_names[i_p])
data_file.close()
data_file = tb.open_file('emission_pytables.hdf', mode="a")
emission = data_file.root.emission
print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.)
File size: 135.1 MB
# Create the tables for the counts
comp_filter = tb.Filters(complib='blosc', complevel=5)
table_uint8 = np.dtype([('counts', 'u1')])
counts_p = []
for i_p in xrange(n_particles):
# 2) create counts table
counts_p.append(data_file.create_table('/c', counts_names[i_p], createparents=True,
description=table_uint8,
chunkshape = time_chunk_size,
expectedrows = n_iter*time_chunk_size,
filters = comp_filter)
)
The tables references are stored in a list counts_p
:
counts_p[0]
/c/counts_p0 (Table(0,), shuffle, blosc(5)) '' description := { "counts": UInt8Col(shape=(), dflt=0, pos=0)} byteorder := 'little' chunkshape := (262144,)
Computes the counts and store them on disk:
%%timeit -n1 -r1
# Extract the counts from the emission
for i_chunk in xrange(n_iter):
emission_chunk = emission[:, i_chunk*time_chunk_size:i_chunk*time_chunk_size+time_chunk_size]
#print emission_chunk.shape
for p_i in xrange(n_particles):
counts_chunk_p_i = compute_counts(emission_chunk[p_i])
counts_p[p_i].append(counts_chunk_p_i)
data_file.flush()
1 loops, best of 1: 1.54 s per loop
~20% Faster that in-memory counts computation!!
print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.)
File size: 147.6 MB
counts_p[0]
/c/counts_p0 (Table(5242880,), shuffle, blosc(5)) '' description := { "counts": UInt8Col(shape=(), dflt=0, pos=0)} byteorder := 'little' chunkshape := (262144,)
For testing purposes, load all the counts and compare with the in-memory results:
counts_tb = np.vstack([c.read() for c in counts_p]).astype('i1')
counts_tb.sum(1)
array([308778, 310327, 310591, 309985, 310035, 310663, 309909, 310130, 310264, 310064, 309592, 311289, 309805, 309989, 310662])
counts_ram.sum(1)
array([311238, 308140, 310120, 309996, 309671, 310261, 310743, 309631, 310158, 309335, 309438, 310189, 309709, 309499, 309755])
(counts_tb == counts_ram).all()
True
When there are more than 1 counts per bin I add a "fraction of bin" to the timestamp in order to avoid photon bunching. The timestamp resolution is increased 10 times so that integers can represent fractions of the initial time-bin.
Lists are defined outside the timeit loop otherwise are not saved:
# Compute timestamps from the counts
fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9]
scale = 10
max_counts = 4
timestamps_tb = []
%%timeit -n1 -r1
for counts_p_i in counts_p:
PH = [counts_p_i.get_where_list('counts >= 1')]
PH[0] *= scale
for frac, v in izip(fractions, range(2, max_counts + 1)):
PH.append(counts_p_i.get_where_list('counts >= %d' % v)*scale)
PH[-1] += frac
t = np.hstack(PH).astype(np.int64)
t.sort(kind='mergesort')
timestamps_tb.append(t)
1 loops, best of 1: 4.29 s per loop
Consistency checks:
print [(t1.size == t2.size) for t1, t2 in zip(timestamps_tb, timestamps_ram)]
print [(t1 == t2).all() for t1, t2 in zip(timestamps_tb, timestamps_ram)]
[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]
timestamps
(without storing counts
)¶This computes "timestamps
" in a list, without the final merge:
fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9]
scale = 10
max_counts = 4
times_list = [[] for i in xrange(n_particles)]
%%timeit -r1 -n1
# Load emission in chunks, and save only the final timestamps
for i_chunk in xrange(n_iter):
i_start = i_chunk*time_chunk_size
emission_chunk = emission[:, i_start:i_start + time_chunk_size]
counts_chunk = compute_counts(emission_chunk)
index = np.arange(0, counts_chunk.shape[1])
# Loop for each particle to compute timestamps
for p_i, counts_chunk_p_i in enumerate(counts_chunk.copy()):
times_c_i = [(index[counts_chunk_p_i >= 1] + i_start)*scale]
for frac, v in izip(fractions, range(2, max_counts + 1)):
times_c_i.append((index[counts_chunk_p_i >= v] + i_start)*scale + frac)
t = np.hstack(times_c_i)
t.sort(kind='mergesort')
times_list[p_i].append(t)
1 loops, best of 1: 2.26 s per loop
~30% faster than computation from disk (when "
counts
are stored)
Consistency checks:
timestamps_tb2 = [np.hstack(t) for t in times_list]
print [(t1.size == t2.size) for t1, t2 in zip(timestamps_tb2, timestamps_ram)]
print [(t1 != t2).sum() for t1, t2 in zip(timestamps_tb2, timestamps_ram)]
[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True] [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
This computes "timestamps
" in a list, then merge them (hstack
) in a single array:
fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9]
scale = 10
max_counts = 4
timestamps_list_all_tb = []
par_list_all_tb = []
%%timeit -r1 -n1
# Load emission in chunks, and save only the final timestamps
for i_chunk in xrange(n_iter):
i_start = i_chunk*time_chunk_size
emission_chunk = emission[:, i_start:i_start + time_chunk_size]
counts_chunk = compute_counts(emission_chunk)
index = np.arange(0, counts_chunk.shape[1])
# Loop for each particle to compute timestamps
times_chunk = [[] for i in xrange(n_particles)]
par_index_chunk = [[] for i in xrange(n_particles)]
for p_i, counts_chunk_p_i in enumerate(counts_chunk.copy()):
times_c_i = [(index[counts_chunk_p_i >= 1] + i_start)*scale]
for frac, v in izip(fractions, range(2, max_counts + 1)):
times_c_i.append((index[counts_chunk_p_i >= v] + i_start)*scale + frac)
# Stack the arrays of different "counts"
t = np.hstack(times_c_i)
times_chunk[p_i] = t
par_index_chunk[p_i] = p_i*np.ones(t.size, dtype='u1')
# Merge the arrays of different particles
times_all = np.hstack(times_chunk)
par_index_all = np.hstack(par_index_chunk)
# Sort timestamps inside the merged chunk
index_sort = times_all.argsort(kind='mergesort')
times_all = times_all[index_sort]
par_index_all = par_index_all[index_sort]
# Save timestamps and particles
timestamps_list_all_tb.append(times_all)
par_list_all_tb.append(par_index_all)
1 loops, best of 1: 5.06 s per loop
# Make union of the chunks
timestamps_all_tb = np.hstack(timestamps_list_all_tb)
par_all_tb = np.hstack(par_list_all_tb)
Consistency checks:
timestamps_all_tb.shape, timestamps_all_ram.shape
((4645930,), (4645930,))
print (timestamps_all_tb == timestamps_all_ram).all()
print (par_all_tb == par_index_all_ram).all()
True True
timestamps
", storing them on disk on-the-go¶#data_file.remove_node('/ts', 'timestamps_all')
#data_file.remove_node('/ts', 'par_all')
data_file.close()
data_file = tb.open_file('emission_pytables.hdf', mode="a")
emission = data_file.root.emission
print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.)
File size: 135.1 MB
# Create the table for the timestamps_all
comp_filter = tb.Filters(complib='blosc', complevel=5)
table_int64 = np.dtype([('times', 'i8')])
table_uint8 = np.dtype([('particle', 'i1')])
timestamps_all_table = data_file.create_table('/ts', 'timestamps_all', createparents=True,
description=table_int64, filters = comp_filter)
par_all_table = data_file.create_table('/ts', 'par_all', createparents=True,
description=table_uint8, filters = comp_filter)
fractions = [5, 2, 8, 4, 9, 1, 7, 3, 6, 9, 0, 5, 2, 8, 4, 9]
scale = 10
max_counts = 4
%%timeit -r1 -n1
# Load emission in chunks, and save only the final timestamps
for i_chunk in xrange(n_iter):
i_start = i_chunk*time_chunk_size
emission_chunk = emission[:, i_start:i_start + time_chunk_size]
counts_chunk = compute_counts(emission_chunk)
index = np.arange(0, counts_chunk.shape[1])
# Loop for each particle to compute timestamps
times_chunk = [[] for i in xrange(n_particles)]
par_index_chunk = [[] for i in xrange(n_particles)]
for p_i, counts_chunk_p_i in enumerate(counts_chunk.copy()):
times_c_i = [(index[counts_chunk_p_i >= 1] + i_start)*scale]
for frac, v in izip(fractions, range(2, max_counts + 1)):
times_c_i.append((index[counts_chunk_p_i >= v] + i_start)*scale + frac)
# Stack the arrays of different "counts"
t = np.hstack(times_c_i)
times_chunk[p_i] = t
par_index_chunk[p_i] = p_i*np.ones(t.size, dtype='u1')
# Merge the arrays of different particles
times_all = np.hstack(times_chunk)
par_index_all = np.hstack(par_index_chunk)
# Sort timestamps inside the merged chunk
index_sort = times_all.argsort(kind='mergesort')
times_all = times_all[index_sort]
par_index_all = par_index_all[index_sort]
# Save timestamps and particles
timestamps_all_table.append(times_all)
par_all_table.append(par_index_all)
1 loops, best of 1: 5.3 s per loop
# Make union of the chunks
timestamps_all_tb3 = timestamps_all_table.read().astype('int64')
par_all_tb3 = par_all_table.read().astype('uint8')
print 'File size: %.1f MB ' % (data_file.get_filesize()/1024./1024.)
File size: 145.0 MB
Consistency checks:
timestamps_all_tb3.shape, timestamps_all_ram.shape
((4645930,), (4645930,))
print (timestamps_all_tb3 == timestamps_all_ram).all()
print (par_all_tb3 == par_index_all_ram).all()
True True
counts
"¶Storing counts will take 10% more space and 40% more time to compute timestamps. Having counts
stored is not an advantage per-se because only the timestamps are needed in the end.
The advantage is that recostructing the index (timestamps) is simpler because we query the full time axis in a single command (.get_where_list('counts >= 1')
). Conversely, with chunked processing, we need to perfome some index arithmetics that is tricky, and maybe a burden to maintain.
However the the code complexity may be small compared to the sorting and merging that is needed in both cases.
timestamps
"¶Timestamps can be accumulated in RAM. However a final hstack()
is needed to "merge" the different chunks stored in a list. This doubles the memory requirements so the RAM may be insufficient.
We can store as-we-go timestamps to a table using .append()
. At the end we can load the table in memory with .read()
. This is only 10% slower than all-in-memory computation but avoids the "double the RAM" requirement. Moreover we can avoid the final full load resulting in minimal RAM usage.
H5py is a much simpler library than pytables. For this usecase of sequential processing seems a better fit than pytables. The only missing feature is the lack of 'blosc' compression. Must be stested if this results in a big performance penalty.
counts
" with pandas¶Solution suggested by Jeff here:
WARNING this takes several times more times than the pytables approach, not usefull in real situations
# generate simulated data
for i in range(10):
# 2) create counts
cs = pd.HDFStore('counts.hdf', mode='w', complib='blosc')
# this is an iterator, can be any size
for df in pd.read_hdf('emission_pandas.hdf', 'df',chunksize=200):
counts = pd.DataFrame(np.random.poisson(lam=df).astype(np.uint8))
# set the index as the same
counts.index = df.index
# store the sum across all particles (as most are zero this will be a
# nice sub-selector
# better maybe to have multiple of these sums that divide the particle space
# you don't have to do this but prob more efficient
# you can do this in another file if you want/need
counts['particles_0_4'] = counts.iloc[:,0:4].sum(1).astype('u1')
counts['particles_5_9'] = counts.iloc[:,5:9].sum(1).astype('u1')
# make the non_zero column indexable
cs.append('df', counts,data_columns=['particles_0_4','particles_5_9'])
cs.close()
# 3) find interesting counts
print pd.read_hdf('counts.hdf','df',where='particles_0_4>0')
print pd.read_hdf('counts.hdf','df',where='particles_5_9>0')