In this lecture we will cover the following topics:
import sys
# Install dependencies if the notebook is running in Colab
if 'google.colab' in sys.modules:
!pip install -U -qq reservoir_computing tsa_course tck dtaidistance
# Imports
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from sklearn import datasets
from sklearn.metrics.pairwise import cosine_similarity, cosine_distances
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.cluster import KMeans
from sklearn import svm
from sklearn.linear_model import Ridge
from sklearn.metrics import accuracy_score, f1_score, v_measure_score, pairwise_kernels, pairwise_distances
from sklearn.model_selection import train_test_split
from sklearn.inspection import DecisionBoundaryDisplay
from sklearn.decomposition import KernelPCA
from sklearn.mixture import GaussianMixture
from sklearn.neighbors import KNeighborsClassifier
from scipy.cluster.hierarchy import linkage, dendrogram, fcluster
import scipy.spatial.distance as ssd
import statsmodels.api as sm
from dtaidistance import dtw, dtw_ndim
from dtaidistance import dtw_visualisation as dtwvis
from tck.TCK import TCK
from tck.datasets import DataLoader
from reservoir_computing.reservoir import Reservoir
from reservoir_computing.tensorPCA import tensorPCA
from reservoir_computing.modules import RC_model
from reservoir_computing.utils import compute_test_scores
Classification
# Generate some toy data
data = datasets.make_blobs(n_samples=800, centers=[[-5,-7], [5,5], [-3,4]], cluster_std=[1.7, 2.5, 1.5], random_state=8)
X, y = data
X = StandardScaler().fit_transform(X)
def plot_class_example(X, y, gamma=1):
_, ax = plt.subplots(figsize=(5, 5))
ax.scatter(X[:, 0], X[:, 1], c=plt.cm.tab10(y), s=15, alpha=0.5)
ax.set_xticks(())
ax.set_yticks(())
ax.spines[['right', 'left', 'top', 'bottom']].set_visible(False)
clf = svm.SVC(kernel='rbf', gamma=gamma, C=1).fit(X, y)
h = .01 # step size in the mesh
x_min, x_max = X[:, 0].min(), X[:, 0].max()
y_min, y_max = X[:, 1].min(), X[:, 1].max()
xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))
# Plot the decision boundary. For that, we will assign a color to each point in the mesh [x_min, m_max]x[y_min, y_max].
Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.contour(xx, yy, Z, colors='k', linewidths=1.5, linestyles='dashed')
plt.xlim(-3, 3)
plt.ylim(-3, 3)
plt.tight_layout()
plt.show()
gamma=20
in the example below to fit the training data better.plot_class_example(X, y, gamma=0.1)
Clustering
def plot_cluster_example(X, K=2):
kmeans = KMeans(n_clusters=K, random_state=0, n_init="auto").fit(X)
clust_lab = kmeans.labels_
centers = kmeans.cluster_centers_
fig, ax = plt.subplots(figsize=(5, 5))
ax.scatter(X[:, 0], X[:, 1], c='gray', s=15, alpha=0.5, edgecolors='k')
ax.set_xticks(())
ax.set_yticks(())
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['left'].set_visible(False)
ax.spines['bottom'].set_visible(False)
for ind,i in enumerate(centers):
class_inds=np.where(clust_lab==ind)[0]
max_dist=np.max(pairwise_distances(i[None, ...], X[class_inds]))
plt.gca().add_artist(plt.Circle(i, max_dist*0.7, fill=False, color='tab:red', linewidth=2.0))
plt.xlim(-3, 3)
plt.ylim(-3, 3)
plt.title(f"{K} Clutsers")
plt.tight_layout()
plt.show()
K=3
and K=4
in the code below.# We use the same data (X) as before, but not the labels (y)
plot_cluster_example(X, K=2)
Accuracy
# Split the data in training and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)
# Fit the classifier
clf = svm.SVC(kernel="linear")
clf.fit(X_train, y_train)
# Compute predictions and accuracy
y_pred = clf.predict(X_test)
print(f"Accuracy: {accuracy_score(y_test, y_pred):.2f}")
Accuracy: 0.99
F1 Score
n_samples_1 = 2000 # Samples of class 0
n_samples_2 = 100 # Samples of class 1
X_imb, y_imb = datasets.make_blobs(
n_samples=[n_samples_1, n_samples_2],
centers=[[0.0, 0.0], [2.0, 2.0]],
cluster_std=[1.5, 0.5],
random_state=0, shuffle=False)
# Split the data in training and test set
X_tr_imb, X_te_imb, y_tr_imb, y_te_imb = train_test_split(X_imb, y_imb, test_size=0.2, random_state=0)
# Fit the classifier
clf = svm.SVC(kernel="linear", class_weight={1: 20}) # Try setting class_weight={1: 20}
clf.fit(X_tr_imb, y_tr_imb)
# Compute predictions and accuracy
y_pred_imb = clf.predict(X_te_imb)
print(f"Accuracy: {accuracy_score(y_te_imb, y_pred_imb):.2f}")
Accuracy: 0.89
print(f"Predictions of class 0: {(y_pred_imb==0).sum()}, predictions of class 1: {(y_pred_imb==1).sum()}")
print(f"F1 score: {f1_score(y_te_imb, y_pred_imb):.2f}")
Predictions of class 0: 355, predictions of class 1: 65 F1 score: 0.45
Normalized Mutual Information (NMI)
# NMI for k-means with different values of k
clust_lab = KMeans(n_clusters=2, random_state=0, n_init="auto").fit(X).labels_
print(f"K=2, NMI: {v_measure_score(clust_lab, y):.2f}")
clust_lab = KMeans(n_clusters=3, random_state=0, n_init="auto").fit(X).labels_
print(f"K=3, NMI: {v_measure_score(clust_lab, y):.2f}")
clust_lab = KMeans(n_clusters=4, random_state=0, n_init="auto").fit(X).labels_
print(f"K=4, NMI: {v_measure_score(clust_lab, y):.2f}")
K=2, NMI: 0.73 K=3, NMI: 0.92 K=4, NMI: 0.84
values = np.linspace(-5, 5, 100)
gamma_vals = [0.1, 1, 5]
plt.figure(figsize=(10, 4))
for gamma in gamma_vals:
similarity = np.exp(-gamma * values**2)
plt.plot(values, similarity, label=f'$\gamma$ = {gamma}')
plt.xlabel('Distance: $\|\mathbf{x} - \mathbf{y}\|^2$')
plt.ylabel('Similarity: $\exp(-\gamma\|\mathbf{x} - \mathbf{y}\|^2)$')
plt.legend()
plt.grid(True)
plt.show()
X, y = datasets.make_circles(noise=0.2, factor=0.5, random_state=1, n_samples=200) # Create toy data
X_train , X_test , y_train, y_test = train_test_split(X, y, random_state=0) #Train-test split
# Cosine similarity matrix
cosine_train = cosine_similarity(X_train)
# RBF similarity matrix
rbf_kernel_train = pairwise_kernels(X_train, metric='rbf', gamma=0.5)
# Plot the data
fig, axes = plt.subplots(1, 3, figsize=(12, 3.5))
cm_bright = ListedColormap(["#FF0000", "#0000FF"])
axes[0].set_title("Input data")
axes[0].scatter(X[:, 0], X[:, 1], c=y, cmap=cm_bright, alpha=0.6, edgecolors="k")
axes[0].set_xticks(())
axes[0].set_yticks(())
# Plot the cosine matrix
idx = np.argsort(y_train)
axes[1].imshow(cosine_train[idx][:, idx], cmap='viridis')
axes[1].set_title("Cosine similarity matrix (linear)")
# Plot the RBF matrix
axes[2].imshow(rbf_kernel_train[idx][:, idx], cmap='viridis')
axes[2].set_title("RBF kernel matrix (nonlinear)");
classifiers = [svm.SVC(kernel="linear"), svm.SVC(gamma=0.5), svm.SVC(gamma=0.1)]
names = ["Linear SVM", "RBF SVM ($\gamma=0.5$)", "RBF SVM ($\gamma=0.1$)"]
figure = plt.figure(figsize=(11, 4))
for i, (name, clf) in enumerate(zip(names, classifiers)):
ax = plt.subplot(1,3,i+1)
clf.fit(X_train, y_train)
score = clf.score(X_test, y_test)
DecisionBoundaryDisplay.from_estimator(
clf, X, cmap=plt.cm.RdBu, alpha=0.8, ax=ax, eps=0.5)
ax.scatter(X[:, 0], X[:, 1], c=y, cmap=cm_bright, edgecolors="k", alpha=0.6)
ax.set_xticks(())
ax.set_yticks(())
ax.set_title(name + f" - Test acc: {score:.2f}")
plt.tight_layout()
plt.show()
X, y = datasets.make_blobs(n_samples=1500, centers=4,
cluster_std=[1.7, 2.5, 0.5, 1.5], random_state=2)
X = StandardScaler().fit_transform(X) # Normalizing the data facilitates setting the radius
# Compute the distance matrix
Dist = pairwise_distances(X, metric="sqeuclidean")
distArray = ssd.squareform(Dist)
# Compute the hierarchy of clusters
Z = linkage(distArray, 'ward')
t=10
and t=30
.partition_1 = fcluster(Z, t=10, criterion="distance")
print("Partition 1: %d clusters"%len(np.unique(partition_1)))
partition_2 = fcluster(Z, t=30, criterion="distance")
print("Partition 1: %d clusters"%len(np.unique(partition_2)))
Partition 1: 8 clusters Partition 1: 4 clusters
def plot_clusters(data, clusters=None, title=None, ax=None):
if ax is None:
_, ax = plt.subplots(figsize=(5, 5))
color = 'gray' if clusters is None else plt.cm.tab10(clusters)
ax.scatter(data[:, 0], data[:, 1], c=color, s=15, alpha=0.5)
ax.set_xticks(())
ax.set_yticks(())
ax.spines[['right', 'left', 'top', 'bottom']].set_visible(False)
ax.set_title(title)
_, axes = plt.subplots(1,3, figsize=(14,4))
plot_clusters(X, title="Data", ax=axes[0])
plot_clusters(X, title="threshold = 10", clusters=partition_1, ax=axes[1])
plot_clusters(X, title="threshold = 30", clusters=partition_2, ax=axes[2])
fig = plt.figure(figsize=(20, 10))
dn = dendrogram(Z, color_threshold=30, above_threshold_color='k',
show_leaf_counts=False)
plt.xticks([])
plt.show()
# Compute the RBF (note that we must convert it to a distance)
rbf_kernel = pairwise_kernels(X, metric='rbf', gamma=1.0) # compute the rbf similarity
rbf_kernel = rbf_kernel + rbf_kernel.T # make symmetric
rbf_kernel /= rbf_kernel.max() # normalize to [0, 1]
rbf_dist = 1.0 - rbf_kernel # convert to distance
np.fill_diagonal(rbf_dist, 0) # due to numerical errors, the diagonal might not be 0
# Compute the partition
distArray = ssd.squareform(rbf_dist)
Z = linkage(distArray, 'ward')
partition_3 = fcluster(Z, t=3, criterion="distance")
# Mahalanobis distance that assigns different weights to the features
weights = np.array([1, 0.1])
Dist = pairwise_distances(X, metric="mahalanobis", VI=np.diag(1/weights**2))
# Compute the partition
distArray = ssd.squareform(Dist)
Z = linkage(distArray, 'ward')
partition_4 = fcluster(Z, t=30, criterion="distance")
_, axes = plt.subplots(1,3, figsize=(14,4))
plot_clusters(X, title="Data", ax=axes[0])
plot_clusters(X, title="RBF-based distance", clusters=partition_3, ax=axes[1])
plot_clusters(X, title="Weighted Mahalanobis distance", clusters=partition_4, ax=axes[2])
X
of size [N, T, V]
.X[i,:,:]
is associated with with a class label y[i]
.Example:
def plot_uwave():
X, Y, _, _ = DataLoader().get_data('UWAVE')
_, axes = plt.subplots(3, 8, figsize=(15, 5), subplot_kw={'projection': '3d'})
for i in range(len(np.unique(Y))):
idx = np.where(Y == i+1)[0][:3]
for j, id in enumerate(idx):
axes[j,i].plot(X[id, :, 0], X[id, :, 1], X[id, :, 2], color=plt.cm.tab10(i))
axes[j,i].set_xticks(())
axes[j,i].set_yticks(())
axes[j,i].set_zticks(())
axes[j,i].spines[['right', 'left', 'top', 'bottom']].set_visible(False)
if j == 0:
axes[j,i].set_title(f"Class {i+1}")
plt.tight_layout()
plt.show()
plot_uwave()
Loaded UWAVE dataset. Number of classes: 8 Data shapes: Xtr: (200, 315, 3) Ytr: (200, 1) Xte: (428, 315, 3) Yte: (428, 1)
An admissible path should satisfy the following conditions:
def DTWDistance(x, y):
for i in range(len(x)):
for j in range(len(y)):
DTW[i, j] = d(x[i], y[j])
if i > 0 or j > 0:
DTW[i, j] += min(
DTW[i-1, j ] if i > 0 else inf,
DTW[i , j-1] if j > 0 else inf,
DTW[i-1, j-1] if (i > 0 and j > 0) else inf
)
return DTW[-1, -1]
T = 100 # Length of the time series
N = 30 # Time series per set
# Generate the first set of time series
Y1 = np.zeros((T, N))
for i in range(N):
Y1[:,i] = sm.tsa.arma_generate_sample(ar=[1, -.9], ma=[1], nsample=T, scale=1)
# Generate the second set of time series
Y2 = np.zeros((T, N))
for i in range(N):
Y2[:,i] = sm.tsa.arma_generate_sample(ar=[1, .9], ma=[1], nsample=T, scale=1)
fig, axes = plt.subplots(2,1, figsize=(10, 5))
axes[0].plot(np.mean(Y1, axis=1))
axes[0].fill_between(range(T), np.mean(Y1, axis=1) - np.std(Y1, axis=1), np.mean(Y1, axis=1) + np.std(Y1, axis=1), alpha=0.3)
axes[0].set_title("First set")
axes[1].plot(np.mean(Y2, axis=1))
axes[1].fill_between(range(T), np.mean(Y2, axis=1) - np.std(Y2, axis=1), np.mean(Y2, axis=1) + np.std(Y2, axis=1), alpha=0.3)
axes[1].set_title("Second set")
plt.show()
s1 = Y2[:,1]
s2 = Y2[:,2]
fig = plt.figure(figsize=(5, 5))
d, paths = dtw.warping_paths(s1, s2)
best_path = dtw.best_path(paths)
dtwvis.plot_warpingpaths(s1, s2, paths, best_path, figure=fig);
s1 = Y1[:,1]
s2 = Y2[:,1]
fig = plt.figure(figsize=(5, 5))
d, paths = dtw.warping_paths(s1, s2)
best_path = dtw.best_path(paths)
dtwvis.plot_warpingpaths(s1, s2, paths, best_path, figure=fig);
# Concatenate the two sets of time series
Y = np.concatenate((Y1, Y2), axis=1).T
# Compute the distance matrix
dtw_dist = dtw.distance_matrix_fast(Y)
# Plot the distance matrix
plt.figure(figsize=(4,4))
plt.imshow(dtw_dist, cmap='viridis')
plt.colorbar()
plt.show()
# compute euclidean distance between the time series
euc_dist = pairwise_distances(Y, metric="sqeuclidean")
plt.figure(figsize=(4,4))
plt.imshow(euc_dist, cmap='viridis')
plt.colorbar()
plt.show()
DataLoader().available_datasets()
Available datasets: AtrialFibrillation ArabicDigits Auslan CharacterTrajectories CMUsubject16 ECG2D Japanese_Vowels KickvsPunch Libras NetFlow RobotArm UWAVE Wafer Chlorine Phalanx SwedishLeaf
Xtr, Ytr, Xte, Yte = DataLoader().get_data('Japanese_Vowels')
Loaded Japanese_Vowels dataset. Number of classes: 9 Data shapes: Xtr: (270, 29, 12) Ytr: (270, 1) Xte: (370, 29, 12) Yte: (370, 1)
# Concatenate X and Xte
X = np.concatenate((Xtr, Xte), axis=0)
Y = np.concatenate((Ytr, Yte), axis=0)
# Compute the dissimilarity matrix
dtw_dist = dtw_ndim.distance_matrix_fast(X)
print("dist shape:", dtw_dist.shape)
dist shape: (640, 640)
dtw_ndim.distance_matrix_fast
does not compute distances across two different sets, meaning that it cannot compute te-tr explicitly.dtw_sim = 1.0 - dtw_dist/dtw_dist.max()
# Plot the similarity matrix
idx_sorted = np.argsort(Y[:,0])
dtw_sim_sorted = dtw_sim[:,idx_sorted][idx_sorted,:]
fig = plt.figure(figsize=(4,4))
plt.imshow(dtw_sim_sorted);
sim_trtr = dtw_sim[:Xtr.shape[0], :Xtr.shape[0]]
print(sim_trtr.shape)
sim_tetr = dtw_sim[Xtr.shape[0]:, :Xtr.shape[0]]
print(sim_tetr.shape)
(270, 270) (370, 270)
clf = svm.SVC(kernel='precomputed', C=1).fit(sim_trtr, Ytr.ravel())
y_pred = clf.predict(sim_tetr)
accuracy = accuracy_score(Yte.ravel(), y_pred)
print(f"Accuracy: {accuracy*100:.2f}%")
Accuracy: 97.30%
# In this case we use the DTW distance directly.
dtw_trtr = dtw_dist[:Xtr.shape[0], :Xtr.shape[0]]
dtw_tetr = dtw_dist[Xtr.shape[0]:, :Xtr.shape[0]]
neigh = KNeighborsClassifier(n_neighbors=3, metric='precomputed') # specify k=3
neigh.fit(dtw_trtr, Ytr.ravel())
y_pred = neigh.predict(dtw_tetr)
accuracy = accuracy_score(Yte.ravel(), y_pred)
print(f"Accuracy: {accuracy*100:.2f}%")
Accuracy: 96.49%
Z
.distArray = ssd.squareform(dtw_dist)
Z = linkage(distArray, 'ward')
Z
with a dendrogram plot.fig = plt.figure(figsize=(20, 10))
dn = dendrogram(Z, color_threshold=50, above_threshold_color='k',
show_leaf_counts=False)
plt.xticks([]);
partition = fcluster(Z, t=55, criterion="distance")
print(f"Found {len(np.unique(partition))} clusters")
Found 9 clusters
print(f"DTW-based clustering NMI: {v_measure_score(partition, Y.ravel()):.2f}")
DTW-based clustering NMI: 0.95
kpca = KernelPCA(n_components=2, kernel='precomputed')
embeddings_pca = kpca.fit_transform(dtw_sim)
fig = plt.figure(figsize=(8,6))
plt.scatter(embeddings_pca[:,0], embeddings_pca[:,1], c=Y.ravel(), s=10, cmap='tab20')
plt.title("Kernel PCA embeddings")
plt.gca().spines[['right', 'left', 'top', 'bottom']].set_visible(False)
plt.xticks(())
plt.yticks(());
def plot_gmm(X, n_components, ax=None):
gmm = GaussianMixture(n_components=n_components, covariance_type="full", random_state=0)
gmm.fit(X)
if ax is None:
_, ax = plt.subplots(1,1,figsize=(6, 6))
# Plot data
ax.scatter(X[:, 0], X[:, 1], c='gray', s=10, alpha=0.5, edgecolors='gray')
# Plot ellipses
for n in range(gmm.n_components):
color = plt.cm.tab10(n)
covariances = gmm.covariances_[n][:2, :2]
v, w = np.linalg.eigh(covariances)
u = w[0] / np.linalg.norm(w[0])
angle = np.arctan2(u[1], u[0])
angle = 180 * angle / np.pi # convert to degrees
v = 2.0 * np.sqrt(2.0) * np.sqrt(v)
ell = mpl.patches.Ellipse(
gmm.means_[n, :2], v[0], v[1], angle=180 + angle, color=color
)
ell.set_clip_box(ax.bbox)
ell.set_alpha(0.5)
ax.add_artist(ell)
ax.spines[['right', 'left', 'top', 'bottom']].set_visible(False)
ax.set_xticks(())
ax.set_yticks(())
# Create toy data
X, y = datasets.make_classification(n_samples=950, n_features=2, n_informative=2,
n_redundant=0, n_clusters_per_class=1, random_state=4,
n_classes=3, class_sep=1.5, flip_y=0.1)
_, axes = plt.subplots(1, 3, figsize=(18, 6))
for i, n in enumerate([2, 3, 4]):
plot_gmm(X, n, ax=axes[i])
axes[i].set_title(f"GMM with $C$={n} components")
Mathematically, a GMM is defined as:
p(x)=C∑c=1πcN(x|μc,Σc)TCK modifies the standard GMM model in two ways.
Xtr, Ytr, Xte, Yte = DataLoader().get_data('Japanese_Vowels')
Loaded Japanese_Vowels dataset. Number of classes: 9 Data shapes: Xtr: (270, 29, 12) Ytr: (270, 1) Xte: (370, 29, 12) Yte: (370, 1)
np.nan
.mask_tr = np.random.choice([0, 1], size=Xtr.shape, p=[0.6, 0.4])
Xtr[mask_tr == 1] = np.nan
mask_te = np.random.choice([0, 1], size=Xte.shape, p=[0.6, 0.4])
Xte[mask_te == 1] = np.nan
G=30
GMMs, each with C=15
components.G
the better the performance but the higher the computation time.C
controls the model complexity. Too low → underfit, too high → overfit.tck = TCK(G=30, C=15)
Ktr = tck.fit(Xtr).predict(mode='tr-tr')
Kte = tck.predict(Xte=Xte, mode='tr-te').T
print(f"Ktr shape: {Ktr.shape}\nKte shape: {Kte.shape}")
The dataset contains missing data Training the TCK using the following parameters: C = 15, G = 30 Number of MTS for each GMM: 216 - 270 (80 - 100 percent) Number of attributes sampled from [2, 11] Length of time segments sampled from [6, 23]
Fitting GMMs: 0%| | 0/420 [00:00<?, ?it/s]
Computing TCK (tr-tr): 0%| | 0/420 [00:00<?, ?it/s]
Computing TCK (tr-te): 0%| | 0/420 [00:00<?, ?it/s]
Ktr shape: (270, 270) Kte shape: (370, 270)
💡 Tip
G
) or the number of components (C
).clf = svm.SVC(kernel='precomputed').fit(Ktr, Ytr.ravel()) # Train
Ypred = clf.predict(Kte) # Test
print(f" Test accuracy: {accuracy_score(Yte, Ypred):.2f}")
Test accuracy: 0.92
Unidirectional Reservoir
X
of shape [N, T, V]
.H
of shape [N, T, H]
.Bidirectional Reservoir
X
of shape [N, T, V]
.H
of shape [N, T, 2*H]
.Xtr, Ytr, Xte, Yte = DataLoader().get_data('Japanese_Vowels')
Loaded Japanese_Vowels dataset. Number of classes: 9 Data shapes: Xtr: (270, 29, 12) Ytr: (270, 1) Xte: (370, 29, 12) Yte: (370, 1)
H_uni = Reservoir(n_internal_units=300).get_states(Xtr, bidir=False)
print(f"Unidir\n H: {H_uni.shape}")
H_bi = Reservoir(n_internal_units=300).get_states(Xtr, bidir=True)
print(f"Bidir\n H: {H_bi.shape}")
Unidir H: (270, 29, 300) Bidir H: (270, 29, 600)
[N,T,H]
(or [N,T,2*H]
) to [N,T,R]
.Standard PCA
H
from [N,T,H]
to [N*T,H]
.R
components.[N*T,R]
back to [N,T,R]
.Tensor PCA
H[n,:,:]
and ˉH=1NN∑n=1Hn∈RT×H.H_red = tensorPCA(n_components=75).fit_transform(H_bi)
print(f"H_red: {H_red.shape}")
H_red: (270, 29, 75)
rx_last = H_red[:,-1,:]
print(f"rx_last: {rx_last.shape}")
rx_last: (270, 75)
Output model space
A more effective representation is obtained as follows.
out_pred = Ridge(alpha=1.0)
# If we use a bidirectional Reservoir we also need to predict the time series backwards
X = np.concatenate((Xtr, Xtr[:, ::-1, :]), axis=2)
coeff, biases = [], []
for i in range(X.shape[0]):
out_pred.fit(H_red[i, 0:-1, :], X[i, 1:, :])
coeff.append(out_pred.coef_.ravel())
biases.append(out_pred.intercept_.ravel())
rx_out = np.concatenate((np.vstack(coeff), np.vstack(biases)), axis=1)
print(f"rx_out: {rx_out.shape}") # [N, 2*V*(R+1)]
rx_out: (270, 1824)
Reservoir model space
res_pred = Ridge(alpha=1.0)
coeff, biases = [], []
for i in range(H_red.shape[0]):
res_pred.fit(H_red[i, 0:-1, :], H_red[i, 1:, :])
coeff.append(res_pred.coef_.ravel())
biases.append(res_pred.intercept_.ravel())
rx_res = np.concatenate((np.vstack(coeff), np.vstack(biases)), axis=1)
print(f"rx_res: {rx_res.shape}") # [N, R*(R+1)]
rx_res: (270, 5700)
RC_model
to perform the classification.config = {}
# Hyperarameters of the reservoir
config['n_internal_units'] = 450 # size of the reservoir
config['spectral_radius'] = 0.9 # largest eigenvalue of the reservoir
config['leak'] = None # amount of leakage in the reservoir state update (None or 1.0 --> no leakage)
config['connectivity'] = 0.25 # percentage of nonzero connections in the reservoir
config['input_scaling'] = 0.1 # scaling of the input weights
config['noise_level'] = 0.01 # noise in the reservoir state update
config['n_drop'] = 5 # transient states to be dropped
config['bidir'] = True # if True, use bidirectional reservoir
config['circle'] = False # use reservoir with circle topology
# Dimensionality reduction hyperparameters
config['dimred_method'] = 'tenpca' # options: {None (no dimensionality reduction), 'pca', 'tenpca'}
config['n_dim'] = 75 # number of resulting dimensions after the dimensionality reduction procedure
# Type of MTS representation
config['mts_rep'] = 'reservoir' # MTS representation: {'last', 'mean', 'output', 'reservoir'}
config['w_ridge_embedding'] = 10.0 # regularization parameter of the ridge regression
# Type of readout
config['readout_type'] = 'lin' # readout used for classification: {'lin', 'mlp', 'svm'}
config['w_ridge'] = 5.0 # regularization of the ridge regression readout
classifier = RC_model(**config)
Xtr, Ytr, Xte, Yte = DataLoader().get_data('Japanese_Vowels')
Loaded Japanese_Vowels dataset. Number of classes: 9 Data shapes: Xtr: (270, 29, 12) Ytr: (270, 1) Xte: (370, 29, 12) Yte: (370, 1)
# One-hot encoding for labels
onehot_encoder = OneHotEncoder(sparse_output=False)
Ytr = onehot_encoder.fit_transform(Ytr)
Yte = onehot_encoder.transform(Yte)
# Train the model
tr_time = classifier.fit(Xtr, Ytr)
Training completed in 0.01 min
# Compute predictions on test data
pred_class = classifier.predict(Xte)
accuracy, f1 = compute_test_scores(pred_class, Yte)
print(f"Accuracy = {accuracy:.3f}, F1-score = {f1:.3f}")
Accuracy = 0.976, F1-score = 0.976
We will perform the following steps:
Xtr, Ytr, Xte, Yte = DataLoader().get_data('Japanese_Vowels')
X = np.concatenate((Xtr, Xte), axis=0)
Y = np.concatenate((Ytr, Yte), axis=0)
Loaded Japanese_Vowels dataset. Number of classes: 9 Data shapes: Xtr: (270, 29, 12) Ytr: (270, 1) Xte: (370, 29, 12) Yte: (370, 1)
RC_model
as before.'readout_type'
to None
.config['readout_type'] = None # We update this entry from the previous config dict
# Instantiate the RC model
rcm = RC_model(**config)
# Generate representations of the input MTS
rcm.fit(X)
mts_representations = rcm.input_repr
Training completed in 0.02 min
# Compute Dissimilarity matrix
Dist = cosine_distances(mts_representations)
distArray = ssd.squareform(Dist)
# Hierarchical clustering
Z = linkage(distArray, 'ward')
clust = fcluster(Z, t=4.0, criterion="distance")
print(f"Found {len(np.unique(clust))} clusters")
# Evaluate the agreement between class and cluster labels
nmi = v_measure_score(Y[:,0], clust)
print(f"Normalized Mutual Information (v-score): {nmi:.3f}")
Found 9 clusters Normalized Mutual Information (v-score): 0.911
This is what we covered in this lecture.
We conclude by highlighting the main pros and cons of the three approaches to compute MTS (dis)similarity.
DTW
TCK
RC-embedding
Xtr, Ytr, Xte, Yte = DataLoader().get_data('Libras')
Loaded Libras dataset. Number of classes: 15 Data shapes: Xtr: (180, 45, 2) Ytr: (180, 1) Xte: (180, 45, 2) Yte: (180, 1)
Repeat points 4-7 from the previous exercise.
Repeat points 4-7 from the previous exercises.