This notebook explores how `giotto-tda`

can be used to gain insights from time-varying data by using ideas from from dynamical systems and persistent homology.

If you are looking at a static version of this notebook and would like to run its contents, head over to GitHub and download the source.

- Topological Methods for the Analysis of Data by Jose Perea
- The sliding window notebooks from Chris Tralie's TDALabs
- Detection of gravitational waves using topological data analysis and convolutional neural network: An improved approach by Christopher Bresten and Jae-Hun Jung. We thank Christopher Bresten for sharing the code and data used in the article.

- Topology in time series forecasting, in which the
*Takens embedding*technique that we will present here is used in time series forecasting tasks by using sliding windows. - Topological feature extraction using VietorisRipsPersistence and PersistenceEntropy for a quick introduction to general topological feature extraction in
`giotto-tda`

.

**License: AGPLv3**

The first step in analysing the topology of time series is to construct a * time delay embedding* or

More formally, given a time series $f(t)$, one can extract a * sequence of vectors* of the form $f_i = [f(t_i), f(t_i + 2 \tau), \ldots, f(t_i + (d-1) \tau)] \in \mathbb{R}^{d}$, where $d$ is the

and the main idea we will explore in this notebook is that if $f$ has a non-trivial recurrent structure, then the image of $TD_{d,\tau}f$ will have non-trivial topology for appropriate choices of $(d, \tau)$.

As a warm-up, recall that a function is periodic with period $T > 0$ if $f(t + T) = f(t)$ for all $t \in \mathbb{R}$. For example, consider the function $f(t) = \cos(5 t)$ which can be visualised as follows:

In [ ]:

```
import numpy as np
import plotly.graph_objects as go
x_periodic = np.linspace(0, 10, 1000)
y_periodic = np.cos(5 * x_periodic)
fig = go.Figure(data=go.Scatter(x=x_periodic, y=y_periodic))
fig.update_layout(xaxis_title="Timestamp", yaxis_title="Amplitude")
fig.show()
```

We can show that periodicity implies ellipticity of the time delay embedding. To do that we need to specify the embedding dimension $d$ and the time delay $\tau$ for the Takens embedding, which in `giotto-tda`

can be achieved as follows:

In [ ]:

```
from gtda.time_series import SingleTakensEmbedding
embedding_dimension_periodic = 3
embedding_time_delay_periodic = 8
stride = 10
embedder_periodic = SingleTakensEmbedding(
parameters_type="fixed",
n_jobs=2,
time_delay=embedding_time_delay_periodic,
dimension=embedding_dimension_periodic,
stride=stride,
)
```

**Tip:** You can use the `stride`

parameter to downsample the time delay embedding. This is handy when you want to quickly compute persistence diagrams on a dense signal.

Let's apply this embedding to our one-dimensional time series to get a 3-dimensional *point cloud*:

In [ ]:

```
y_periodic_embedded = embedder_periodic.fit_transform(y_periodic)
print(f"Shape of embedded time series: {y_periodic_embedded.shape}")
```

We can then use `giotto-tda`

's plotting API to visualise the result:

In [ ]:

```
from gtda.plotting import plot_point_cloud
plot_point_cloud(y_periodic_embedded)
```

As promised, the periodicity of $f$ is reflected in the ellipticity of the time delay embedding! It turns out that in general, * periodic functions trace out ellipses* in $\mathbb{R}^{d}$.

Here is another type of recurrent behaviour: if we let $f(t) = \cos(t) + \cos(\pi t)$ then it follows that $f$ is not periodic since the ratio of the two frequencies is irrational, i.e. we say that $\cos(t)$ and $\cos(\pi t)$ are *incommensurate*. Nevertheless, their sum produces recurrent behaviour:

In [ ]:

```
x_nonperiodic = np.linspace(0, 50, 1000)
y_nonperiodic = np.cos(x_nonperiodic) + np.cos(np.pi * x_nonperiodic)
fig = go.Figure(data=go.Scatter(x=x_nonperiodic, y=y_nonperiodic))
fig.update_layout(xaxis_title="Timestamp", yaxis_title="Amplitude")
fig.show()
```

As before, let's create a time delay embedding for this signal and visualise the resulting point cloud:

In [ ]:

```
embedding_dimension_nonperiodic = 3
embedding_time_delay_nonperiodic = 16
stride = 3
embedder_nonperiodic = SingleTakensEmbedding(
parameters_type="fixed",
n_jobs=2,
time_delay=embedding_time_delay_nonperiodic,
dimension=embedding_dimension_nonperiodic,
stride=stride,
)
y_nonperiodic_embedded = embedder_nonperiodic.fit_transform(y_nonperiodic)
plot_point_cloud(y_nonperiodic_embedded)
```

In the examples above we saw that the resulting point clouds appear to exhibit distinct topology. We can verify this explicitly using persistent homology! First we need to reshape our point cloud arrays in a form suitable for the VietorisRipsPersistence transformer, namely `(n_samples, n_points, n_dimensions)`

:

In [ ]:

```
y_periodic_embedded = y_periodic_embedded[None, :, :]
y_nonperiodic_embedded = y_nonperiodic_embedded[None, :, :]
```

The next step is to calculate the persistence diagrams associated with each point cloud. In `giotto-tda`

we can do this with the Vietoris-Rips construction as follows:

In [ ]:

```
from gtda.homology import VietorisRipsPersistence
# 0 - connected components, 1 - loops, 2 - voids
homology_dimensions = [0, 1, 2]
periodic_persistence = VietorisRipsPersistence(
homology_dimensions=homology_dimensions, n_jobs=6
)
print("Persistence diagram for periodic signal")
periodic_persistence.fit_transform_plot(y_periodic_embedded)
nonperiodic_persistence = VietorisRipsPersistence(
homology_dimensions=homology_dimensions, n_jobs=6
)
print("Persistence diagram for nonperiodic signal")
nonperiodic_persistence.fit_transform_plot(y_nonperiodic_embedded);
```

What can we conclude from these diagrams? The first thing that stands out is the different types of homology dimensions that are most persistent. In the periodic case we see a single point associated with 1-dimensional persistent homology, namely a loop! On the other hand, the non-periodic signal has revealed two points associated with 2-dimensional persistent homology, namely *voids*. These clear differences in topology make the time delay embedding technique especially powerful at classifying different time series.

In the examples above, we manually chose values for the embedding dimension $d$ and time delay $\tau$. However, it turns out there are two techniques that can be used to determine these parameters *automatically*:

- Mutual information to determine $\tau$
- False nearest neighbours to determine $d$

In `giotto-tda`

, these techniques are applied when we select `parameters_type="search"`

in the `SingleTakensEmbedding`

transformer, e.g.

```
embedder = SingleTakensEmbedding(
parameters_type="search", time_delay=time_delay, dimension=embedding_dimension,
)
```

where the values of `time_delay`

and `embedding_dimension`

provide * upper bounds* on the search algorithm. Before applying this to our sample signals, let's have a look at how these methods actually work under the hood.

To determine an optimal value for $\tau$ we first calculate the maximum $x_\mathrm{max}$ and minimum $x_\mathrm{min}$ values of the time series, and divide the interval $[x_\mathrm{min}, x_\mathrm{max}]$ into a large number of bins. We let $p_k$ be the probability that an element of the time series is in the $k$th bin and let $p_{j,k}$ be the probability that $x_i$ is in the $j$th bin while $x_{i+\tau}$ is in the $k$th bin. Then the mutual information is defined as:

$$ I(\tau) = - \sum_{j=1}^{n_\mathrm{bins}} \sum_{k=1}^{n_\mathrm{bins}} p_{j,k}(\tau) \log \frac{p_{j,k}(\tau)}{p_j p_k} $$The first minimum of $I(\tau)$ gives the optimal time delay since there we get the most information by adding $x_{i+\tau}$.

The false nearest neighbours algorithm is based on the assumption that "unfolding" or embedding a deterministic system into successively higher dimensions is smooth. In other words, points which are close in one embedding dimension should be close in a higher one. More formally, if we have a point $p_i$ and neighbour $p_j$, we check if the normalised distance $R_i$ for the next dimension is greater than some threshold $R_\mathrm{th}$:

$$ R_i = \frac{\mid x_{i+m\tau} - x_{j+m\tau} \mid}{\lVert p_i - p_j \rVert} > R_\mathrm{th}$$If $R_i > R_\mathrm{th}$ then we have a "false nearest neighbour" and the optimal embedding dimension is obtained by minimising the total number of such neighbours.

Let's now apply these ideas to our original signals to see what the algorithm determines as optimal choices for $d$ and $\tau$. We will allow the search to scan up to relatively large values of $(d, \tau)$ to ensure we do not get stuck in a sub-optimal minimum.

For the periodic signal, we initialise the Takens embedding as follows:

In [ ]:

```
max_embedding_dimension = 30
max_time_delay = 30
stride = 5
embedder_periodic = SingleTakensEmbedding(
parameters_type="search",
time_delay=max_time_delay,
dimension=max_embedding_dimension,
stride=stride,
)
```

Let's create a helper function to view the optimal values found during the search:

In [ ]:

```
def fit_embedder(embedder: SingleTakensEmbedding, y: np.ndarray, verbose: bool=True) -> np.ndarray:
"""Fits a Takens embedder and displays optimal search parameters."""
y_embedded = embedder.fit_transform(y)
if verbose:
print(f"Shape of embedded time series: {y_embedded.shape}")
print(
f"Optimal embedding dimension is {embedder.dimension_} and time delay is {embedder.time_delay_}"
)
return y_embedded
```

In [ ]:

```
y_periodic_embedded = fit_embedder(embedder_periodic, y_periodic)
```

Although the resulting embedding is in a high dimensional space, we can apply dimensionality reduction techniques like principal component analysis to project down to 3-dimensions for visualisation:

In [ ]:

```
from sklearn.decomposition import PCA
pca = PCA(n_components=3)
y_periodic_embedded_pca = pca.fit_transform(y_periodic_embedded)
plot_point_cloud(y_periodic_embedded_pca)
```

Now for the non-periodic case we have:

In [ ]:

```
embedder_nonperiodic = SingleTakensEmbedding(
parameters_type="search",
n_jobs=2,
time_delay=max_time_delay,
dimension=max_embedding_dimension,
stride=stride,
)
y_nonperiodic_embedded = fit_embedder(embedder_nonperiodic, y_nonperiodic)
```

In [ ]:

```
pca = PCA(n_components=3)
y_nonperiodic_embedded_pca = pca.fit_transform(y_nonperiodic_embedded)
plot_point_cloud(y_nonperiodic_embedded_pca)
```

So we have embedding point clouds whose geometry looks clearly distinct; how about the persistence diagrams? As we did earlier, we first need to reshape our arrays into the form `(n_samples, n_points, n_dimensions)`

:

In [ ]:

```
y_periodic_embedded = y_periodic_embedded[None, :, :]
y_nonperiodic_embedded = y_nonperiodic_embedded[None, :, :]
```

The next step is to calculate the persistence diagrams associated with each point cloud:

In [ ]:

```
homology_dimensions = [0, 1, 2]
periodic_persistence = VietorisRipsPersistence(homology_dimensions=homology_dimensions)
print("Persistence diagram for periodic signal")
periodic_persistence.fit_transform_plot(y_periodic_embedded)
nonperiodic_persistence = VietorisRipsPersistence(
homology_dimensions=homology_dimensions, n_jobs=6
)
print("Persistence diagram for nonperiodic signal")
nonperiodic_persistence.fit_transform_plot(y_nonperiodic_embedded);
```

In this case the persistence diagram for the periodic signal is essentially unchanged, but the non-periodic signal now reveals two $H_1$ points and one $H_2$ one - the signature of a hypertorus! It turns out that in general, the image of $TD_{d,\tau}f$ is a hypertorus.

As an application of the above ideas, let's examine how persistent homology can help detect gravitational waves in noisy signals. The following is adapted from the article by Chrisopher Bresten and Jae-Hun Jung. As shown in the videos below, we will aim to pick out the "chirp" signal of two colliding black holes from a very noisy backgound.

In [ ]:

```
from IPython.display import YouTubeVideo
YouTubeVideo("Y3eR49ogsF0", width=600, height=400)
```

In [ ]:

```
YouTubeVideo("QyDcTbR-kEA", width=600, height=400)
```

In the article, the authors create a synthetic training set as follows:

- Generate gravitational wave signals that correspond to non-spinning binary black hole mergers
- Generate a noisy time series and embed a gravitational wave signal with probability 0.5 at a random time.

The result is a set of time series of the form

$$ s = g + \epsilon \frac{1}{R}\xi $$where $g$ is a gravitational wave signal from the reference set, $\xi$ is Gaussian noise, $\epsilon=10^{-19}$ scales the noise amplitude to the signal, and $R \in (0.075, 0.65)$ is a parameter that controls the signal-to-noise-ratio (SNR).

As a warmup, let's generate some noisy signals with a constant SNR of 17.98. As shown in Table 1 of the article, this corresponds to an $R$ value of 0.65. By picking the upper end of the interval, we can gain a sense for what the best possible performance is for our time series classifier. We pick a small number of samples to make the computations run fast, but in practice would scale this by 1-2 orders of magnitude as done in the original article.

In [ ]:

```
from data.generate_datasets import make_gravitational_waves
from pathlib import Path
R = 0.65
n_signals = 100
DATA = Path("./data")
noisy_signals, gw_signals, labels = make_gravitational_waves(
path_to_data=DATA, n_signals=n_signals, r_min=R, r_max=R, n_snr_values=1
)
print(f"Number of noisy signals: {len(noisy_signals)}")
print(f"Number of timesteps per series: {len(noisy_signals[0])}")
```

Next let's visualise the two different types of time series that we wish to classify: one that is pure noise vs. one that is composed of noise plus an embedded gravitational wave signal:

In [ ]:

```
from plotly.subplots import make_subplots
import plotly.graph_objects as go
# get the index corresponding to the first pure noise time series
background_idx = np.argmin(labels)
# get the index corresponding to the first noise + gravitational wave time series
signal_idx = np.argmax(labels)
ts_noise = noisy_signals[background_idx]
ts_background = noisy_signals[signal_idx]
ts_signal = gw_signals[signal_idx]
fig = make_subplots(rows=1, cols=2)
fig.add_trace(
go.Scatter(x=list(range(len(ts_noise))), y=ts_noise, mode="lines", name="noise"),
row=1,
col=1,
)
fig.add_trace(
go.Scatter(
x=list(range(len(ts_background))),
y=ts_background,
mode="lines",
name="background",
),
row=1,
col=2,
)
fig.add_trace(
go.Scatter(x=list(range(len(ts_signal))), y=ts_signal, mode="lines", name="signal"),
row=1,
col=2,
)
fig.show()
```

Let's examine what the time delay embedding of a pure gravitational wave signal looks like:

In [ ]:

```
embedding_dimension = 30
embedding_time_delay = 30
stride = 5
embedder = SingleTakensEmbedding(
parameters_type="search", n_jobs=6, time_delay=embedding_time_delay, dimension=embedding_dimension, stride=stride
)
y_gw_embedded = fit_embedder(embedder, gw_signals[0])
```

As we did in our simple examples, we can use PCA to project our high-dimensional space to 3-dimensions for visualisation:

In [ ]:

```
pca = PCA(n_components=3)
y_gw_embedded_pca = pca.fit_transform(y_gw_embedded)
plot_point_cloud(y_gw_embedded_pca)
```

From the plot we can see that the decaying periodic signal generated by a black hole merger emerges as a *spiral* in the time delay embedding space! For contrast, let's compare this to one of the pure noise time series in our sample:

In [ ]:

```
embedding_dimension = 30
embedding_time_delay = 30
stride = 5
embedder = SingleTakensEmbedding(
parameters_type="search", n_jobs=6, time_delay=embedding_time_delay, dimension=embedding_dimension, stride=stride
)
y_noise_embedded = fit_embedder(embedder, noisy_signals[background_idx])
pca = PCA(n_components=3)
y_noise_embedded_pca = pca.fit_transform(y_noise_embedded)
plot_point_cloud(y_noise_embedded_pca)
```

Evidently, pure noise resembles a high-dimensional ball in the time delay embedding space. Let's see if we can use persistent homology to tease apart which time series contain a gravitational wave signal versus those that don't. To do so we will adapt the strategy from the original article:

- Generate 200-dimensional time delay embeddings of each time series
- Use PCA to reduce the time delay embeddings to 3-dimensions
- Use the Vietoris-Rips construction to calculate persistence diagrams of $H_0$ and $H_1$ generators
- Extract feature vectors using persistence entropy
- Train a binary classifier on the topological features

We can do steps 1 and 2 by using the following `giotto-tda`

tools:

- The
`TakensEmbedding`

transformer – instead of`SingleTakensEmbedding`

– which will transform each time series in`noisy_signals`

separately and return a collection of point clouds; `CollectionTransformer`

, which is a convenience "meta-estimator" for applying the same PCA to each point cloud resulting from step 1.

Using the `Pipeline`

class from `giotto-tda`

, we can chain all operations up to and including step 4 as follows:

In [ ]:

```
from gtda.time_series import TakensEmbedding
from gtda.metaestimators import CollectionTransformer
from gtda.diagrams import PersistenceEntropy, Scaler
from gtda.pipeline import Pipeline
embedding_dimension = 200
embedding_time_delay = 10
stride = 10
embedder = TakensEmbedding(time_delay=embedding_time_delay,
dimension=embedding_dimension,
stride=stride)
batch_pca = CollectionTransformer(PCA(n_components=3), n_jobs=-1)
persistence = VietorisRipsPersistence(homology_dimensions=[0, 1], n_jobs=-1)
scaling = Scaler()
entropy = PersistenceEntropy(normalize=True, nan_fill_value=-10)
steps = [("embedder", embedder),
("pca", batch_pca),
("persistence", persistence),
("scaling", scaling),
("entropy", entropy)]
topological_transfomer = Pipeline(steps)
```

In [ ]:

```
features = topological_transfomer.fit_transform(noisy_signals)
```

For the final step, let's train a simple classifier on our topological features. As usual we create training and validation sets

In [ ]:

```
from sklearn.model_selection import train_test_split
X_train, X_valid, y_train, y_valid = train_test_split(
features, labels, test_size=0.1, random_state=42
)
```

and then fit and evaluate our model:

In [ ]:

```
from sklearn.metrics import accuracy_score, roc_auc_score
def print_scores(fitted_model):
res = {
"Accuracy on train:": accuracy_score(fitted_model.predict(X_train), y_train),
"ROC AUC on train:": roc_auc_score(
y_train, fitted_model.predict_proba(X_train)[:, 1]
),
"Accuracy on valid:": accuracy_score(fitted_model.predict(X_valid), y_valid),
"ROC AUC on valid:": roc_auc_score(
y_valid, fitted_model.predict_proba(X_valid)[:, 1]
),
}
for k, v in res.items():
print(k, round(v, 3))
```

In [ ]:

```
from sklearn.linear_model import LogisticRegression
model = LogisticRegression()
model.fit(X_train, y_train)
print_scores(model)
```

As a simple baseline, this model is not too bad - it outperforms the deep learning baseline in the article which typically fares little better than random on the raw data. However, the combination of deep learning and persistent homology is where significant performance gains are seen - we leave this as an exercise to the intrepid reader!