from __future__ import division, print_function import numpy as np import h5py import blz import sh import os import sys import itertools from datetime import datetime import gc import matplotlib.pyplot as plt import seaborn as sns sns.set_style('darkgrid') import random random.seed(1) import petl.interactive as etl import petlx.all import pandas from bisect import bisect_left, bisect_right %matplotlib inline def log(*msg): print(str(datetime.now()) + ' :: ' + ' '.join(map(str, msg)), file=sys.stderr) sys.stderr.flush() def random_indices(n, k): return sorted(random.sample(range(n), k)) def random_selection(n, k): selection = np.zeros((n,), np.bool) indices = random_indices(n, k) selection[indices] = True return indices, selection devices = 'ssd', 'hdd' [2**n for n in range(13, 20)] h5_compression_configurations = ( dict(compression='lzf', shuffle=False), dict(compression='gzip', compression_opts=1, shuffle=False), dict(compression='gzip', compression_opts=3, shuffle=False), ) h5_chunk_sizes = [2**n for n in range(13, 20)] h5_chunk_widths = (1, 10, 100) h5_chunks_configurations = [(s//w, w) for (s, w) in itertools.product(h5_chunk_sizes, h5_chunk_widths)] h5_configurations = [dict(chunks=chunks, **compression) for (chunks, compression) in itertools.product(h5_chunks_configurations, h5_compression_configurations)] h5_configurations.append(dict()) # add default config, no chunks [2**n for n in range(10, 20)] blz_cnames = ('blosclz', 'lz4', 'lz4hc', 'zlib') blz_clevels = (1, 3) blz_chunklens = [2**n for n in range(10, 20)] blz_configurations = [dict(cname=cname, clevel=clevel, shuffle=False, chunklen=chunklen) for (cname, clevel, chunklen) in itertools.product(blz_cnames, blz_clevels, blz_chunklens)] n_variants = 1000000 n_samples = 100 n_replicates = 3 chromosome = '3L' data_replicates = [] with h5py.File('data/data.h5', mode='r') as f: genotypes = f[chromosome]['calldata']['genotype'] for n in range(n_replicates): # each replicate uses a different random slice of the data n_variants_total = genotypes.shape[0] start_index = random.randint(0, n_variants_total - n_variants) stop_index = start_index + n_variants d = genotypes[start_index:stop_index, :n_samples, ...] # reduce data to 2 dimensions for the purpose of this benchmark d = np.sum(d, axis=2, dtype='i1') log(n, start_index, stop_index, d.shape, '%.1fMb' % (d.nbytes/1e6), '%.3f%% nonzero' % (np.count_nonzero(d)*100/d.size)) data_replicates.append(d) def benchmark_h5(op, verbose=False): results = [] # storage devices for device in devices: # replicates for n, data in enumerate(data_replicates): # configurations for i, config in enumerate(h5_configurations): sh.sudo.drop_cache() fn = os.path.join(device, '%s.%s.h5' % (n, i)) if verbose: log(n, i, fn, config) # operation to be benchmarked before = datetime.now() ds = op(fn, config.copy(), data) after = datetime.now() compression = config.get('compression', 'none') + '.' + str(config.get('compression_opts', '')) chunk_height, chunk_width = config.get('chunks', (0, 0)) chunk_size = chunk_width*chunk_height elapsed = (after-before).total_seconds() size_on_disk = os.stat(fn).st_size compression_ratio = data.nbytes / size_on_disk result = [n, device, compression, chunk_height, chunk_width, chunk_size, elapsed, size_on_disk, compression_ratio] results.append(result) df = pandas.DataFrame.from_records(results, columns=['replicate', 'device', 'compression', 'chunk_height', 'chunk_width', 'chunk_size', 'time', 'size_on_disk', 'compression_ratio']) return df def benchmark_blz(op, verbose=False): results = [] # storage devices for device in devices: # replicates for n, data in enumerate(data_replicates): # configurations for i, config in enumerate(blz_configurations): sh.sudo.drop_cache() fn = os.path.join(device, '%s.%s.blz' % (n, i)) if verbose: log(n, i, fn, config) # operation to be benchmarked before = datetime.now() b = op(fn, config.copy(), data) after = datetime.now() compression = config.get('cname') + '.' + str(config.get('clevel')) chunklen = config.get('chunklen') elapsed = (after-before).total_seconds() size_on_disk = b.cbytes compression_ratio = data.nbytes / size_on_disk result = [n, device, compression, chunklen, elapsed, size_on_disk, compression_ratio] results.append(result) df = pandas.DataFrame.from_records(results, columns=['replicate', 'device', 'compression', 'chunklen', 'time', 'size_on_disk', 'compression_ratio']) return df def op_write_h5(fn, config, data): with h5py.File(fn, mode='w') as h5: ds = h5.create_dataset('test', data=data, **config) return ds def op_write_blz(fn, config, data): bparams = blz.bparams(cname=config.pop('cname'), clevel=config.pop('clevel'), shuffle=config.pop('shuffle', False)) b = blz.barray(data, rootdir=fn, mode='w', bparams=bparams, **config) return b df_write_h5 = benchmark_h5(op_write_h5) df_write_blz = benchmark_blz(op_write_blz) grid = sns.factorplot(x='chunk_size', y='time', row='device', col='chunk_width', hue='compression', data=df_write_h5, kind='point', estimator=np.min) grid.set(ylim=(0, 3)) grid.fig.suptitle('HDF5 write speed', fontsize=20, y=1.03); grid = sns.factorplot(x='chunklen', y='time', row='device', hue='compression', data=df_write_blz, kind='point', estimator=np.min, palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels))) grid.set(ylim=(0, 3)) grid.fig.suptitle('BLZ write speed', fontsize=20, y=1.03); grid = sns.factorplot(x='chunk_size', y='compression_ratio', col='chunk_width', hue='compression', data=df_write_h5, kind='point', estimator=np.min) grid.fig.suptitle('HDF5 compression ratio', fontsize=20, y=1.03); grid = sns.factorplot(x='chunklen', y='compression_ratio', hue='compression', data=df_write_blz, kind='point', estimator=np.min, palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels))) grid.fig.suptitle('BLZ compression ratio', fontsize=20, y=1.03); def op_read_h5(fn, *args): with h5py.File(fn, mode='r') as h5: ds = h5['test'] a = ds[:] return ds def op_read_blz(fn, *args): b = blz.barray(rootdir=fn, mode='r') a = b[:] return b df_read_h5 = benchmark_h5(op_read_h5) df_read_blz = benchmark_blz(op_read_blz) grid = sns.factorplot(x='chunk_size', y='time', row='device', col='chunk_width', hue='compression', data=df_read_h5, kind='point', estimator=np.min) grid.set(ylim=(0, 2)) grid.fig.suptitle('HDF5 read all', fontsize=20, y=1.03); grid = sns.factorplot(x='chunklen', y='time', row='device', hue='compression', data=df_read_blz, kind='point', estimator=np.min, palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels))) grid.set(ylim=(0, 2)) grid.fig.suptitle('BLZ read all', fontsize=20, y=1.03); def op_read_col_h5(fn, *args): with h5py.File(fn, mode='r') as h5: ds = h5['test'] a = ds[:, 0] return ds def op_read_col_blz(fn, *args): b = blz.barray(rootdir=fn, mode='r') a = b[:, 0] return b df_read_col_h5 = benchmark_h5(op_read_col_h5) df_read_col_blz = benchmark_blz(op_read_col_blz) grid = sns.factorplot(x='chunk_size', y='time', row='device', col='chunk_width', hue='compression', data=df_read_col_h5, kind='point', estimator=np.min) grid.set(ylim=(0, 1.5)) grid.fig.suptitle('HDF5 read column', fontsize=20, y=1.03); grid = sns.factorplot(x='chunklen', y='time', row='device', hue='compression', data=df_read_col_blz, kind='point', estimator=np.min, palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels))) grid.set(ylim=(0, 1.5)) grid.fig.suptitle('BLZ read column', fontsize=20, y=1.03); def op_read_rows_h5(fn, *args): with h5py.File(fn, mode='r') as h5: ds = h5['test'] a = ds[400000:600000, :] return ds def op_read_rows_blz(fn, *args): b = blz.barray(rootdir=fn, mode='r') a = b[400000:600000, :] return b df_read_rows_h5 = benchmark_h5(op_read_rows_h5) df_read_rows_blz = benchmark_blz(op_read_rows_blz) grid = sns.factorplot(x='chunk_size', y='time', row='device', col='chunk_width', hue='compression', data=df_read_rows_h5, kind='point', estimator=np.min) grid.set(ylim=(0, .8)) grid.fig.suptitle('HDF5 read rows', fontsize=20, y=1.03); grid = sns.factorplot(x='chunklen', y='time', row='device', hue='compression', data=df_read_rows_blz, kind='point', estimator=np.min, palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels))) grid.set(ylim=(0, .8)) grid.fig.suptitle('BLZ read rows', fontsize=20, y=1.03); step = 200 def op_read_rows_step_h5(fn, *args): with h5py.File(fn, mode='r') as h5: ds = h5['test'] a = ds[::step, :] return ds def op_read_rows_step_blz(fn, *args): b = blz.barray(rootdir=fn, mode='r') a = b[::step, :] return b df_read_rows_step_h5 = benchmark_h5(op_read_rows_step_h5) df_read_rows_step_blz = benchmark_blz(op_read_rows_step_blz) grid = sns.factorplot(x='chunk_size', y='time', row='device', col='chunk_width', hue='compression', data=df_read_rows_step_h5, kind='point', estimator=np.min) grid.set(ylim=(0, 2)) grid.fig.suptitle('HDF5 read rows with step', fontsize=20, y=1.03); grid = sns.factorplot(x='chunklen', y='time', row='device', hue='compression', data=df_read_rows_step_blz, kind='point', estimator=np.min, palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels))) grid.set(ylim=(0, 2)) grid.fig.suptitle('BLZ read rows with step', fontsize=20, y=1.03); row_indices, row_selection = random_selection(n_variants, n_variants//200) np.count_nonzero(row_selection) def op_read_rows_sel_h5(fn, *args): with h5py.File(fn, mode='r') as h5: ds = h5['test'] a = ds[row_selection, :] return ds def op_read_rows_sel_blz(fn, *args): b = blz.barray(rootdir=fn, mode='r') a = b[row_selection, :] return b df_read_rows_sel_h5 = benchmark_h5(op_read_rows_sel_h5) # df_read_rows_sel_blz = benchmark_blz(op_read_rows_sel_blz) # not implemented grid = sns.factorplot(x='chunk_size', y='time', row='device', col='chunk_width', hue='compression', data=df_read_rows_sel_h5, kind='point', estimator=np.min) #grid.set(ylim=(0, 1.5)) grid.fig.suptitle('HDF5 read rows with selection (fancy indexing)', fontsize=20, y=1.03); # grid = sns.factorplot(x='chunklen', y='time', # row='device', hue='compression', # data=df_read_rows_sel_blz, kind='point', estimator=np.min, # palette=sns.color_palette('hls', n_colors=len(blz_cnames)*len(blz_clevels))) # grid.set(ylim=(0, .8)) # grid.fig.suptitle('BLZ read rows with selection', fontsize=20, y=1.03); def op_read_rows_sel_np_h5(fn, *args): with h5py.File(fn, mode='r') as h5: ds = h5['test'] a = np.take(ds[:], row_indices, axis=0) return ds df_read_rows_sel_np_h5 = benchmark_h5(op_read_rows_sel_np_h5) grid = sns.factorplot(x='chunk_size', y='time', row='device', col='chunk_width', hue='compression', data=df_read_rows_sel_np_h5, kind='point', estimator=np.min) #grid.set(ylim=(0, 1.5)) grid.fig.suptitle('HDF5 read rows with selection (numpy in-memory)', fontsize=20, y=1.03); def h5_take2d(dataset, row_indices, col_indices=None, block_size=None): # make sure row_indices are sorted array row_indices = np.array(sorted(row_indices)) # how many rows are we selecting? n_rows = dataset.shape[0] n_rows_out = len(row_indices) # how many columns are we selecting? n_cols = dataset.shape[1] if col_indices: n_cols_out = len(col_indices) else: n_cols_out = n_cols # setup output array out_shape = (n_rows_out, n_cols_out) + dataset.shape[2:] out = np.empty(out_shape, dtype=dataset.dtype) # determine block size if block_size is None: if dataset.chunks is not None: # use dataset chunk height block_size = dataset.chunks[0] else: # use arbitrary number block_size = 1000 # iterate block-wise offset = 0 for block_start in xrange(0, n_rows, block_size): block_stop = min(block_start+block_size, n_rows) # how many indices to process in this block? i = bisect_left(row_indices, block_start) j = bisect_left(row_indices, block_stop) n = j-i ridx = row_indices[i:j] # only do anything if there are indices for this block if n: # load data for this block a = dataset[block_start:block_stop] # take rows b = np.take(a, ridx-block_start, axis=0) # take columns if col_indices: b = np.take(b, col_indices, axis=1) # store output out[offset:offset+n, ...] = b # keep track of offset offset += n return out def op_read_rows_sel_alt_h5(fn, *args): with h5py.File(fn, mode='r') as h5: ds = h5['test'] a = h5_take2d(ds, row_indices) return ds df_read_rows_sel_alt_h5 = benchmark_h5(op_read_rows_sel_alt_h5) grid = sns.factorplot(x='chunk_size', y='time', row='device', col='chunk_width', hue='compression', data=df_read_rows_sel_alt_h5, kind='point', estimator=np.min) #grid.set(ylim=(0, 1.5)) grid.fig.suptitle('HDF5 read rows with selection (numpy block-wise implementation)', fontsize=20, y=1.03); col_indices = random_indices(n_samples, n_samples//10) len(col_indices) def op_read_sel2d_h5(fn, *args): with h5py.File(fn, mode='r') as h5: ds = h5['test'] a = h5_take2d(ds, row_indices, col_indices) return ds df_read_sel2d_h5 = benchmark_h5(op_read_sel2d_h5) grid = sns.factorplot(x='chunk_size', y='time', row='device', col='chunk_width', hue='compression', data=df_read_sel2d_h5, kind='point', estimator=np.min) #grid.set(ylim=(0, 1)) grid.fig.suptitle('HDF5 2D selection (numpy block-wise implementation)', fontsize=20, y=1.03); def h5_take2d_points(dataset, row_indices, col_indices, block_size=1000): # determine number of items to select m = len(row_indices) n = len(col_indices) z = m*n # initialise output array out = np.empty((z,), dtype=dataset.dtype) # convert indices into coordinates coords = itertools.product(row_indices, col_indices) # set up selection sel = h5py._hl.selections.PointSelection(dataset.shape) typ = h5py.h5t.py_create(dataset.dtype) # process blocks at a time for block_start in xrange(0, z, block_size): # materialise a block of coordinates selection = np.asarray(list(itertools.islice(coords, block_size))) # set selection sel.set(selection) # read data block_stop = block_start + len(selection) space = h5py.h5s.create_simple(sel.mshape) dataset.id.read(space, sel._id, out[block_start:block_stop], typ) # reshape output array out = out.reshape(m, n) return out def op_read_points_h5(fn, *args): with h5py.File(fn, mode='r') as h5: ds = h5['test'] a = h5_take2d_points(ds, row_indices, col_indices) return ds df_read_points_h5 = benchmark_h5(op_read_points_h5) grid = sns.factorplot(x='chunk_size', y='time', row='device', col='chunk_width', hue='compression', data=df_read_points_h5, kind='point', estimator=np.min) #grid.set(ylim=(0, 1)) grid.fig.suptitle('HDF5 2D selection (points implementation)', fontsize=20, y=1.03);