This notebook investigates the performance of HDF5 and BLZ disk-based formats for writing and reading 2-dimensional arrays of genotype data.
The motivation is using HDF5 or BLZ to store arrays of genotype data, where the first dimension corresponds to genome positions where individuals differ genetically from each other (a.k.a. variants) and the second dimension corresponds to a cohort of individuals (a.k.a. samples). Each cell within the array stores the genotype of the individual at a genetic variant, coded as a 1 byte integer.
We typically have >10 million variants across >1000 samples. Hence the arrays are long and thin. This notebook benchmarks a smaller but similar shaped dataset of 1000000 variants by 100 samples.
Most genetic variants are also rare, hence the arrays are sparse along both dimensions, with most values being zeros.
Different data analysis tasks require different access patterns for these data. Hence this notebook tests performance under a number of different access patterns, to get some idea of which configurations provide reasonable all-round performance, and which configurations might be optimal for some access patterns but slow for others.
Data
The data used in this benchmark are real genotype data from the Anopheles gambiae 1000 genomes project phase 1 preview data release. Genotype data from chromosome arm 3L are used.
Hardware
All tests were performed on a Toshiba Portege Z930-108. Tests were performed using the onboard SSD, and using a WD My Passport 2TB external hard disk connected via USB 3.0.
from __future__ import division, print_function
import numpy as np
import h5py
import blz
import sh
import os
import sys
import itertools
from datetime import datetime
import gc
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('darkgrid')
import random
random.seed(1)
import petl.interactive as etl
import petlx.all
import pandas
from bisect import bisect_left, bisect_right
%matplotlib inline
def log(*msg):
print(str(datetime.now()) + ' :: ' + ' '.join(map(str, msg)), file=sys.stderr)
sys.stderr.flush()
def random_indices(n, k):
return sorted(random.sample(range(n), k))
def random_selection(n, k):
selection = np.zeros((n,), np.bool)
indices = random_indices(n, k)
selection[indices] = True
return indices, selection
devices = 'ssd', 'hdd'
[2**n for n in range(13, 20)]
[8192, 16384, 32768, 65536, 131072, 262144, 524288]
h5_compression_configurations = (
dict(compression='lzf', shuffle=False),
dict(compression='gzip', compression_opts=1, shuffle=False),
dict(compression='gzip', compression_opts=3, shuffle=False),
)
h5_chunk_sizes = [2**n for n in range(13, 20)]
h5_chunk_widths = (1, 10, 100)
h5_chunks_configurations = [(s//w, w) for (s, w) in itertools.product(h5_chunk_sizes, h5_chunk_widths)]
h5_configurations = [dict(chunks=chunks, **compression)
for (chunks, compression) in itertools.product(h5_chunks_configurations,
h5_compression_configurations)]
h5_configurations.append(dict()) # add default config, no chunks
[2**n for n in range(10, 20)]
[1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072, 262144, 524288]
blz_cnames = ('blosclz', 'lz4', 'lz4hc', 'zlib')
blz_clevels = (1, 3)
blz_chunklens = [2**n for n in range(10, 20)]
blz_configurations = [dict(cname=cname, clevel=clevel, shuffle=False, chunklen=chunklen)
for (cname, clevel, chunklen) in itertools.product(blz_cnames,
blz_clevels,
blz_chunklens)]
n_variants = 1000000
n_samples = 100
n_replicates = 3
chromosome = '3L'
data_replicates = []
with h5py.File('data/data.h5', mode='r') as f:
genotypes = f[chromosome]['calldata']['genotype']
for n in range(n_replicates):
# each replicate uses a different random slice of the data
n_variants_total = genotypes.shape[0]
start_index = random.randint(0, n_variants_total - n_variants)
stop_index = start_index + n_variants
d = genotypes[start_index:stop_index, :n_samples, ...]
# reduce data to 2 dimensions for the purpose of this benchmark
d = np.sum(d, axis=2, dtype='i1')
log(n, start_index, stop_index, d.shape, '%.1fMb' % (d.nbytes/1e6), '%.3f%% nonzero' % (np.count_nonzero(d)*100/d.size))
data_replicates.append(d)
2014-07-22 11:06:06.374455 :: 0 955908 1955908 (1000000, 100) 100.0Mb 19.061% nonzero 2014-07-22 11:06:10.942690 :: 1 6028902 7028902 (1000000, 100) 100.0Mb 15.036% nonzero 2014-07-22 11:06:14.405189 :: 2 5433726 6433726 (1000000, 100) 100.0Mb 14.826% nonzero
The command drop_cache
is just a small script that executes:
#!/bin/bash
echo 1 > /proc/sys/vm/drop_caches
This must be run as sudo, so to avoid passwords this script has been set in the sudoers file with NOPASSWD.
def benchmark_h5(op, verbose=False):
results = []
# storage devices
for device in devices:
# replicates
for n, data in enumerate(data_replicates):
# configurations
for i, config in enumerate(h5_configurations):
sh.sudo.drop_cache()
fn = os.path.join(device, '%s.%s.h5' % (n, i))
if verbose:
log(n, i, fn, config)
# operation to be benchmarked
before = datetime.now()
ds = op(fn, config.copy(), data)
after = datetime.now()
compression = config.get('compression', 'none') + '.' + str(config.get('compression_opts', ''))
chunk_height, chunk_width = config.get('chunks', (0, 0))
chunk_size = chunk_width*chunk_height
elapsed = (after-before).total_seconds()
size_on_disk = os.stat(fn).st_size
compression_ratio = data.nbytes / size_on_disk
result = [n, device, compression, chunk_height, chunk_width, chunk_size, elapsed, size_on_disk, compression_ratio]
results.append(result)
df = pandas.DataFrame.from_records(results,
columns=['replicate',
'device',
'compression',
'chunk_height',
'chunk_width',
'chunk_size',
'time',
'size_on_disk',
'compression_ratio'])
return df
def benchmark_blz(op, verbose=False):
results = []
# storage devices
for device in devices:
# replicates
for n, data in enumerate(data_replicates):
# configurations
for i, config in enumerate(blz_configurations):
sh.sudo.drop_cache()
fn = os.path.join(device, '%s.%s.blz' % (n, i))
if verbose:
log(n, i, fn, config)
# operation to be benchmarked
before = datetime.now()
b = op(fn, config.copy(), data)
after = datetime.now()
compression = config.get('cname') + '.' + str(config.get('clevel'))
chunklen = config.get('chunklen')
elapsed = (after-before).total_seconds()
size_on_disk = b.cbytes
compression_ratio = data.nbytes / size_on_disk
result = [n, device, compression, chunklen, elapsed, size_on_disk, compression_ratio]
results.append(result)
df = pandas.DataFrame.from_records(results,
columns=['replicate',
'device',
'compression',
'chunklen',
'time',
'size_on_disk',
'compression_ratio'])
return df
def op_write_h5(fn, config, data):
with h5py.File(fn, mode='w') as h5:
ds = h5.create_dataset('test', data=data, **config)
return ds
def op_write_blz(fn, config, data):
bparams = blz.bparams(cname=config.pop('cname'),
clevel=config.pop('clevel'),
shuffle=config.pop('shuffle', False))
b = blz.barray(data, rootdir=fn, mode='w', bparams=bparams, **config)
return b
df_write_h5 = benchmark_h5(op_write_h5)
df_write_blz = benchmark_blz(op_write_blz)
grid = sns.factorplot(x='chunk_size', y='time',
row='device', col='chunk_width', hue='compression',
data=df_write_h5, kind='point', estimator=np.min)
grid.set(ylim=(0, 3))
grid.fig.suptitle('HDF5 write speed', fontsize=20, y=1.03);
grid = sns.factorplot(x='chunklen', y='time',
row='device', hue='compression',
data=df_write_blz, kind='point', estimator=np.min,
palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels)))
grid.set(ylim=(0, 3))
grid.fig.suptitle('BLZ write speed', fontsize=20, y=1.03);
grid = sns.factorplot(x='chunk_size', y='compression_ratio',
col='chunk_width', hue='compression',
data=df_write_h5, kind='point', estimator=np.min)
grid.fig.suptitle('HDF5 compression ratio', fontsize=20, y=1.03);
grid = sns.factorplot(x='chunklen', y='compression_ratio',
hue='compression',
data=df_write_blz, kind='point', estimator=np.min,
palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels)))
grid.fig.suptitle('BLZ compression ratio', fontsize=20, y=1.03);
def op_read_h5(fn, *args):
with h5py.File(fn, mode='r') as h5:
ds = h5['test']
a = ds[:]
return ds
def op_read_blz(fn, *args):
b = blz.barray(rootdir=fn, mode='r')
a = b[:]
return b
df_read_h5 = benchmark_h5(op_read_h5)
df_read_blz = benchmark_blz(op_read_blz)
grid = sns.factorplot(x='chunk_size', y='time',
row='device', col='chunk_width', hue='compression',
data=df_read_h5, kind='point', estimator=np.min)
grid.set(ylim=(0, 2))
grid.fig.suptitle('HDF5 read all', fontsize=20, y=1.03);
grid = sns.factorplot(x='chunklen', y='time',
row='device', hue='compression',
data=df_read_blz, kind='point', estimator=np.min,
palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels)))
grid.set(ylim=(0, 2))
grid.fig.suptitle('BLZ read all', fontsize=20, y=1.03);
def op_read_col_h5(fn, *args):
with h5py.File(fn, mode='r') as h5:
ds = h5['test']
a = ds[:, 0]
return ds
def op_read_col_blz(fn, *args):
b = blz.barray(rootdir=fn, mode='r')
a = b[:, 0]
return b
df_read_col_h5 = benchmark_h5(op_read_col_h5)
df_read_col_blz = benchmark_blz(op_read_col_blz)
grid = sns.factorplot(x='chunk_size', y='time',
row='device', col='chunk_width', hue='compression',
data=df_read_col_h5, kind='point', estimator=np.min)
grid.set(ylim=(0, 1.5))
grid.fig.suptitle('HDF5 read column', fontsize=20, y=1.03);
grid = sns.factorplot(x='chunklen', y='time',
row='device', hue='compression',
data=df_read_col_blz, kind='point', estimator=np.min,
palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels)))
grid.set(ylim=(0, 1.5))
grid.fig.suptitle('BLZ read column', fontsize=20, y=1.03);
def op_read_rows_h5(fn, *args):
with h5py.File(fn, mode='r') as h5:
ds = h5['test']
a = ds[400000:600000, :]
return ds
def op_read_rows_blz(fn, *args):
b = blz.barray(rootdir=fn, mode='r')
a = b[400000:600000, :]
return b
df_read_rows_h5 = benchmark_h5(op_read_rows_h5)
df_read_rows_blz = benchmark_blz(op_read_rows_blz)
grid = sns.factorplot(x='chunk_size', y='time',
row='device', col='chunk_width', hue='compression',
data=df_read_rows_h5, kind='point', estimator=np.min)
grid.set(ylim=(0, .8))
grid.fig.suptitle('HDF5 read rows', fontsize=20, y=1.03);
grid = sns.factorplot(x='chunklen', y='time',
row='device', hue='compression',
data=df_read_rows_blz, kind='point', estimator=np.min,
palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels)))
grid.set(ylim=(0, .8))
grid.fig.suptitle('BLZ read rows', fontsize=20, y=1.03);
step = 200
def op_read_rows_step_h5(fn, *args):
with h5py.File(fn, mode='r') as h5:
ds = h5['test']
a = ds[::step, :]
return ds
def op_read_rows_step_blz(fn, *args):
b = blz.barray(rootdir=fn, mode='r')
a = b[::step, :]
return b
df_read_rows_step_h5 = benchmark_h5(op_read_rows_step_h5)
df_read_rows_step_blz = benchmark_blz(op_read_rows_step_blz)
grid = sns.factorplot(x='chunk_size', y='time',
row='device', col='chunk_width', hue='compression',
data=df_read_rows_step_h5, kind='point', estimator=np.min)
grid.set(ylim=(0, 2))
grid.fig.suptitle('HDF5 read rows with step', fontsize=20, y=1.03);
grid = sns.factorplot(x='chunklen', y='time',
row='device', hue='compression',
data=df_read_rows_step_blz, kind='point', estimator=np.min,
palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels)))
grid.set(ylim=(0, 2))
grid.fig.suptitle('BLZ read rows with step', fontsize=20, y=1.03);
row_indices, row_selection = random_selection(n_variants, n_variants//200)
np.count_nonzero(row_selection)
5000
def op_read_rows_sel_h5(fn, *args):
with h5py.File(fn, mode='r') as h5:
ds = h5['test']
a = ds[row_selection, :]
return ds
def op_read_rows_sel_blz(fn, *args):
b = blz.barray(rootdir=fn, mode='r')
a = b[row_selection, :]
return b
df_read_rows_sel_h5 = benchmark_h5(op_read_rows_sel_h5)
# df_read_rows_sel_blz = benchmark_blz(op_read_rows_sel_blz) # not implemented
grid = sns.factorplot(x='chunk_size', y='time',
row='device', col='chunk_width', hue='compression',
data=df_read_rows_sel_h5, kind='point', estimator=np.min)
#grid.set(ylim=(0, 1.5))
grid.fig.suptitle('HDF5 read rows with selection (fancy indexing)', fontsize=20, y=1.03);
# grid = sns.factorplot(x='chunklen', y='time',
# row='device', hue='compression',
# data=df_read_rows_sel_blz, kind='point', estimator=np.min,
# palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels)))
# grid.set(ylim=(0, .8))
# grid.fig.suptitle('BLZ read rows with selection', fontsize=20, y=1.03);
This implementation loads the entire dataset into memory then uses np.take(). This is just to set a baseline to compare other implementations to.
def op_read_rows_sel_np_h5(fn, *args):
with h5py.File(fn, mode='r') as h5:
ds = h5['test']
a = np.take(ds[:], row_indices, axis=0)
return ds
df_read_rows_sel_np_h5 = benchmark_h5(op_read_rows_sel_np_h5)
grid = sns.factorplot(x='chunk_size', y='time',
row='device', col='chunk_width', hue='compression',
data=df_read_rows_sel_np_h5, kind='point', estimator=np.min)
#grid.set(ylim=(0, 1.5))
grid.fig.suptitle('HDF5 read rows with selection (numpy in-memory)', fontsize=20, y=1.03);
def h5_take2d(dataset, row_indices, col_indices=None, block_size=None):
# make sure row_indices are sorted array
row_indices = np.array(sorted(row_indices))
# how many rows are we selecting?
n_rows = dataset.shape[0]
n_rows_out = len(row_indices)
# how many columns are we selecting?
n_cols = dataset.shape[1]
if col_indices:
n_cols_out = len(col_indices)
else:
n_cols_out = n_cols
# setup output array
out_shape = (n_rows_out, n_cols_out) + dataset.shape[2:]
out = np.empty(out_shape, dtype=dataset.dtype)
# determine block size
if block_size is None:
if dataset.chunks is not None:
# use dataset chunk height
block_size = dataset.chunks[0]
else:
# use arbitrary number
block_size = 1000
# iterate block-wise
offset = 0
for block_start in xrange(0, n_rows, block_size):
block_stop = min(block_start+block_size, n_rows)
# how many indices to process in this block?
i = bisect_left(row_indices, block_start)
j = bisect_left(row_indices, block_stop)
n = j-i
ridx = row_indices[i:j]
# only do anything if there are indices for this block
if n:
# load data for this block
a = dataset[block_start:block_stop]
# take rows
b = np.take(a, ridx-block_start, axis=0)
# take columns
if col_indices:
b = np.take(b, col_indices, axis=1)
# store output
out[offset:offset+n, ...] = b
# keep track of offset
offset += n
return out
def op_read_rows_sel_alt_h5(fn, *args):
with h5py.File(fn, mode='r') as h5:
ds = h5['test']
a = h5_take2d(ds, row_indices)
return ds
df_read_rows_sel_alt_h5 = benchmark_h5(op_read_rows_sel_alt_h5)
grid = sns.factorplot(x='chunk_size', y='time',
row='device', col='chunk_width', hue='compression',
data=df_read_rows_sel_alt_h5, kind='point', estimator=np.min)
#grid.set(ylim=(0, 1.5))
grid.fig.suptitle('HDF5 read rows with selection (numpy block-wise implementation)', fontsize=20, y=1.03);
col_indices = random_indices(n_samples, n_samples//10)
len(col_indices)
10
def op_read_sel2d_h5(fn, *args):
with h5py.File(fn, mode='r') as h5:
ds = h5['test']
a = h5_take2d(ds, row_indices, col_indices)
return ds
df_read_sel2d_h5 = benchmark_h5(op_read_sel2d_h5)
grid = sns.factorplot(x='chunk_size', y='time',
row='device', col='chunk_width', hue='compression',
data=df_read_sel2d_h5, kind='point', estimator=np.min)
#grid.set(ylim=(0, 1))
grid.fig.suptitle('HDF5 2D selection (numpy block-wise implementation)', fontsize=20, y=1.03);
def h5_take2d_points(dataset, row_indices, col_indices, block_size=1000):
# determine number of items to select
m = len(row_indices)
n = len(col_indices)
z = m*n
# initialise output array
out = np.empty((z,), dtype=dataset.dtype)
# convert indices into coordinates
coords = itertools.product(row_indices, col_indices)
# set up selection
sel = h5py._hl.selections.PointSelection(dataset.shape)
typ = h5py.h5t.py_create(dataset.dtype)
# process blocks at a time
for block_start in xrange(0, z, block_size):
# materialise a block of coordinates
selection = np.asarray(list(itertools.islice(coords, block_size)))
# set selection
sel.set(selection)
# read data
block_stop = block_start + len(selection)
space = h5py.h5s.create_simple(sel.mshape)
dataset.id.read(space,
sel._id,
out[block_start:block_stop],
typ)
# reshape output array
out = out.reshape(m, n)
return out
def op_read_points_h5(fn, *args):
with h5py.File(fn, mode='r') as h5:
ds = h5['test']
a = h5_take2d_points(ds, row_indices, col_indices)
return ds
df_read_points_h5 = benchmark_h5(op_read_points_h5)
grid = sns.factorplot(x='chunk_size', y='time',
row='device', col='chunk_width', hue='compression',
data=df_read_points_h5, kind='point', estimator=np.min)
#grid.set(ylim=(0, 1))
grid.fig.suptitle('HDF5 2D selection (points implementation)', fontsize=20, y=1.03);