The audio genre recognition pipeline:
This notebook constructs a KNN graph from samples and compute the normalized graph Laplacian for future use as a regularization term.
data_scaling_graph
: if and how the input data should be scaled. Acceptable values are None
, features
, samples
and dataset
.K
: number of nearest neighbors (minimum number of edges per vertex).dm
: distance metric: euclidean
, cosine_dist
, cosine_sim
.Csigma
: constant which multiplies the mean of the weights when computing the $\sigma$ of the Gaussian kernel. Not relevant when dm
is cosine_sim
as we do not use a kernel in that case.diag
: wether we want the diagonal of the weight matrix to be zero (no self-connected vertices) or ones (may help to regularize the normalized Laplacian, no difference for the un-normalized one).laplacian
: Laplacian type (normalized, unnormalized).tol
: tolerance when asserting values.Ngenres, Nclips, Nframes
: a way to reduce the size of the dataset.noise_std
: standard deviation of the Gaussian noise to be added to the data.folder
: relative path to HDF5 files.filename_*
: name of the HDF5 file.if 'p' in globals().keys():
# Hyper-parameters passed by the experiment runner.
for key, value in p.items():
globals()[key] = value
else:
data_scaling_graph = None
K = 10 + 1 # 5 to 10 + 1 for self-reference
dm = 'euclidean'
Csigma = 1
diag = True
laplacian = 'normalized'
tol = 1e-5
Ngenres, Nclips, Nframes = 10, 100, 644
noise_std = 0
folder = 'data'
filename_audio = 'audio.hdf5'
filename_graph = 'graph.hdf5'
import os, time
import numpy as np
import h5py
import pyflann
#import sklearn.neighbors
#from annoy import AnnoyIndex
import scipy.sparse
toverall = time.time()
filename = os.path.join(folder, filename_audio)
with h5py.File(filename, 'r') as audio:
X = audio.get('Xs')
n = X.shape[-1]
X = X[:Ngenres,:Nclips,:Nframes,...] # Load into memory.
X.resize(Ngenres * Nclips * Nframes * 2, n)
Nvertices, n = X.shape
print('Data: {}, {}'.format(X.shape, X.dtype))
# Scaling.
if data_scaling_graph is 'features':
X -= np.min(X, axis=0)
X /= np.max(X, axis=0)
elif data_scaling_graph is 'samples':
X = X.T
X -= np.min(X, axis=0)
X /= np.max(X, axis=0)
X = X.T
elif data_scaling_graph is 'dataset':
X -= np.min(X)
X /= np.max(X)
# Add Gaussian noise.
if noise_std is not 0:
X += np.random.normal(scale=noise_std, size=X.shape)
# Center the data to compute an angular distance (cosine similarity).
# Not for cosine_dist as it relies on a positive space.
# Result in completely different data distributions.
if dm is 'cosine_sim':
X -= X.mean()
assert X.mean() < 100 * tol # Quiet large for unscaled data.
# Normalize: put each sample on the unit sphere.
if dm in ['cosine_dist', 'cosine_sim']:
#print(np.sum(np.sqrt(np.sum(X**2, axis=1)) == 0))
X += 1e-20 # To avoid division by zero if we have a null vector.
X /= np.sqrt(np.sum(X**2, axis=1))[:,np.newaxis]
assert np.linalg.norm(X[0,:]) - 1 < tol
if False:
params = {'n_neighbors': K}
params['algorithm'] = 'brute' # ball_tree, kd_tree, brute
params['metric'] = 'euclidean' # minkowski, euclidean, cosine
nbrs = sklearn.neighbors.NearestNeighbors(**params).fit(X)
tstart = time.time()
dist, idx = nbrs.kneighbors(X)
print('Elapsed time: {:.2f} seconds'.format(time.time() - tstart))
if False:
tstart = time.time()
lshf = sklearn.neighbors.LSHForest()
lshf.fit(X)
print('Elapsed time: {:.2f} seconds'.format(time.time() - tstart))
tstart = time.time()
dist, idx = lshf.kneighbors(X, n_neighbors=K)
print('Elapsed time: {:.2f} seconds'.format(time.time() - tstart))
Time efficiency:
if False:
flann = pyflann.FLANN()
flann.build_index(X) # autotuned
idx, dist = flann.nn_index(X, K)
flann.delete_index()
if True:
tstart = time.time()
#idx, dist = flann.nn(X, X, K, algorithm='linear')
idx, dist = pyflann.FLANN().nn(X, X, K,
algorithm='autotuned',
target_precision=.99)
#idx, dist = flann.nn(X, X, K)
print('Elapsed time: {:.2f} seconds'.format(time.time() - tstart))
if False:
a = AnnoyIndex(n, metric='angular') # euclidean, angular
for i in range(Nvertices):
a.add_item(i, X[i,:])
We cannot exclude self-references (because the testset is the dataset) here as we have no guarantee that the first column points to itself.
assert idx.shape == (Nvertices, K)
assert dist.shape == (Nvertices, K)
print('All self-referenced in the first column: {}'.format(np.alltrue(dist[:,0] == 0)))
Compute the distance:
if dm is 'euclidean':
# We could even omit the square root.
dist = np.sqrt(dist)
elif dm is 'cosine_dist':
# Here the division.
dist = dist / 2.
elif dm is 'cosine_sim':
dist = 1 - dist / 4.
else:
raise ValueError
print('dist in [{}, {}]'.format(dist.min(), dist.max()))
# Verification.
i, k = 14, 3
j = idx[i, k]
if dm is 'euclidean':
d = np.linalg.norm(X[i,:] - X[j,:])
elif dm is 'cosine_dist':
assert np.linalg.norm(X[i,:]) - 1 < tol
assert np.linalg.norm(X[j,:]) - 1 < tol
d = 1 - np.sum(X[i,:] * X[j,:])
elif dm is 'cosine_sim':
assert np.linalg.norm(X[i,:]) - 1 < tol
assert np.linalg.norm(X[j,:]) - 1 < tol
d = .5 + .5 * np.sum(X[i,:] * X[j,:])
assert abs(dist[i,k] - d) < tol
When using a distance, the edge weights are defined by a Gaussian kernel $w_{ij} = \exp({\frac{-d_{ij}}{\sigma}})$. The scale is defined according to [Perona'04]. Note that we do not use the kernel when working with the cosine similarity.
if dm is 'cosine_sim':
w = dist
else:
sigma = Csigma * np.mean(dist[:,-1])
i = 73; assert dist[i,:].max() == dist[i,-1]
w = np.exp(-dist / sigma)
print('w in [{}, {}]'.format(w.min(), w.max()))
Generate indices via an iterator.
class indices(object):
def __init__(self, N, K):
self.N = N
self.K = K
self.n = 0
self.k = 0
def __len__(self):
return self.N * self.K
def __iter__(self):
return self
# Python 3.
def __next__(self):
return self.next()
# Python 2.
def next(self):
self.k += 1
if self.k > self.K:
self.k = 1
self.n += 1
if self.n >= self.N:
self.k = 0
self.n = 0
raise StopIteration()
return self.n
Construct the sparse weight matrix.
i = list(indices(Nvertices, K))
j = idx.flat # flat, ravel(), flatten()
v = w.flat
# COO is good for matrix construction (LIL to insert elements).
W = scipy.sparse.coo_matrix((v, (i,j))).tolil()
del i, j, v
assert W.shape == (Nvertices, Nvertices)
assert W.nnz == Nvertices * K
# It should be True... False means that KNN did not find
# two identical vectors to be close enough (approximation).
Nones = np.sum(W.diagonal() == 1)
print('Ones on the diagonal: {} (over {})'.format(Nones, Nvertices))
print('assert: {}'.format(Nones == Nvertices))
if diag:
W.setdiag(Nvertices*[1])
else:
W.setdiag(Nvertices*[0])
assert np.all(W.diagonal() == diag)
# CSR is good for arithmetic operations.
W = W.tocsr()
W.eliminate_zeros()
We want an undirected graph, i.e. a symmetric weight matrix.
#W = W/2 + W.T/2
#W = np.maximum(W, W.T) # Does not work for sparse matrices.
bigger = W.T > W
W = W - W.multiply(bigger) + W.T.multiply(bigger)
del bigger
assert (W - W.T).sum() < tol # Should be symmetric.
if diag:
assert np.all(W.diagonal() == 1)
else:
assert np.all(W.diagonal() == 0)
print('W in [{}, {}]'.format(W.min(), W.max()))
We could verify that the matrix is positive-semidefinite by computing its Cholesky decomposition (CHOLMOD for sparse matrices).
Compute the degree matrix.
d = W.sum(axis=0)
Compute the Laplacian or the symmetric normalized Laplacian matrix (which needs $D^{-1/2}$).
if laplacian is 'unnormalized':
D = scipy.sparse.diags(d.A.squeeze(), 0)
L = D - W
elif laplacian is 'normalized':
d = 1 / np.sqrt(d)
D = scipy.sparse.diags(d.A.squeeze(), 0)
I = scipy.sparse.identity(Nvertices, dtype=D.dtype)
L = I - D * W * D
del I
else:
raise ValueError
del d, D
assert (L - L.T).sum() < tol # Should be symmetric.
Two ways of saving sparse matrices with HDF5:
filename = os.path.join(folder, filename_graph)
# Remove existing HDF5 file without warning if non-existent.
try:
os.remove(filename)
except OSError:
pass
with h5py.File(filename, 'w') as graph:
# Metadata: hyper-parameters.
for attr in ('K', 'dm', 'Csigma', 'diag', 'laplacian'):
graph.attrs[attr] = globals()[attr]
# Data: weight and Laplacian matrices.
for mat in ('W', 'L'):
m = globals()[mat]
for par in ('data', 'indices', 'indptr', 'shape'):
arr = np.asarray(getattr(m, par))
graph.create_dataset(mat+'_'+par, data=arr)
# Show datasets, their dimensionality and data type.
print('Datasets:')
for dname, dset in graph.items():
print(' {:10}: {:10}, {}'.format(dname, dset.shape, dset.dtype))
# Display HDF5 attributes.
print('Attributes:')
for name, value in graph.attrs.items():
print(' {} = {}'.format(name, value))
print('Overall time: {:.2f} seconds'.format(time.time() - toverall))