Clustering times series data with SQL

The data

The data in the example cases have these columns:

  • Time column time
  • One or more columns for engine heat in celcius degrees.

An increase of 1 in the time column equals 1 hour. Each case has 3 days of data.

The value in the engine heat column increases by time when the engine is running. Once the temperature raises to a certain level, the system automatically switces to secondary engine.

The case

This info should be extracted from the data in this imaginary case:

  • Identify the cluster id for each observation
  • Count the number of engine switches (clusters) in the data
  • Calculate the number of observations between each engine switch

For one reason or another the engine temperature sensor is the only available information. Because of the system limitations SQL is the only possible analytics tool.

This notebook is loosely inspired by an actual business need, but the data and examples are generalization of the problem.

Initialize

Load libraries and create variables.

In [1]:
#Import custom python files
import module as m
import config as c

#Reload custom libraries in case of changes
import importlib
importlib.reload(m)
importlib.reload(c)

#Generate a temporary in memory database
import sqlite3
con = sqlite3.connect(":memory:")

#Data analysis libraries
import pandas as pd
import numpy as np

#Initialize plotly for notebooks
from plotly.offline import init_notebook_mode
init_notebook_mode(connected=True)

Generate the initial data

This is the base data for all of the cases.

In [2]:
#Display the data frame

df_init = m.generate_initial_data()

print("Rows in the data: {}".format(df_init.shape[0]))
display(df_init[7:17])
Cluster breakpoints: [ 0 13 25 33 55 63 71]
Rows in the data: 72
time cluster_id start_heat cluster_rank
7 7 1 20.0 7
8 8 1 20.0 7
9 9 1 20.0 7
10 10 1 20.0 7
11 11 1 20.0 7
12 12 1 20.0 7
13 13 2 18.0 6
14 14 2 18.0 6
15 15 2 18.0 6
16 16 2 18.0 6

Case 1: Clustering a single variable

In this case we make an expectation that the engine heat always increases until the engine is switched.

The clusters can thus be detected only by observing the previous data point.

Generate data

In [3]:
#Generate data for case1 from the initial data
df_case1 = df_init.copy()
df_case1 = m.generate_engine_heat_1(df_case1)

#Plot the data
m.plot_heat_2d(df_case1, plot_title="Case 1: Known clusters")

Write data to sqlite database table

In [4]:
#Write to sqlite database having only the time and heat columns
df_case1[[c.col_time, 'engine_heat']].to_sql(c.tbl_case1, con, if_exists="replace", index=False)

Cluster by using SQL

Start a new cluster if previous value is greater than the previous.

In [5]:
#The query to get all rows labled with cluster id
sql_case1 = """
        --Cumulative sum to generate cluster id for each row
        SELECT *, SUM(is_new_cluster) OVER (ORDER BY {0} RANGE UNBOUNDED PRECEDING) AS cluster_id
        FROM(
            --Creates a column that indicates whether the row starts a new cluster
            SELECT *, CASE WHEN ((engine_heat - engine_heat_prev) < 0) OR (engine_heat_prev IS NULL) THEN TRUE ELSE 0 END AS is_new_cluster
            FROM(
                --Creates a column for previous engine heat value
                SELECT {0}, {1}, 
                    LAG({1}, 1, null) OVER (ORDER BY {0}) AS engine_heat_prev
                FROM {2}
            )
        )
""".format(c.col_time, "engine_heat", c.tbl_case1)

print(sql_case1)
        --Cumulative sum to generate cluster id for each row
        SELECT *, SUM(is_new_cluster) OVER (ORDER BY time RANGE UNBOUNDED PRECEDING) AS cluster_id
        FROM(
            --Creates a column that indicates whether the row starts a new cluster
            SELECT *, CASE WHEN ((engine_heat - engine_heat_prev) < 0) OR (engine_heat_prev IS NULL) THEN TRUE ELSE 0 END AS is_new_cluster
            FROM(
                --Creates a column for previous engine heat value
                SELECT time, engine_heat, 
                    LAG(engine_heat, 1, null) OVER (ORDER BY time) AS engine_heat_prev
                FROM tbl_case1
            )
        )

In [6]:
#Use the query to read the data from database
df_case1_clustered = pd.read_sql(sql_case1, con)

#Show the first rows of the data
df_case1_clustered.head(15)
Out[6]:
time engine_heat engine_heat_prev is_new_cluster cluster_id
0 0 20.245559 NaN 1 1
1 1 22.599505 20.245559 0 1
2 2 24.487942 22.599505 0 1
3 3 25.858173 24.487942 0 1
4 4 27.936116 25.858173 0 1
5 5 29.490934 27.936116 0 1
6 6 31.440419 29.490934 0 1
7 7 33.219714 31.440419 0 1
8 8 35.786280 33.219714 0 1
9 9 38.401815 35.786280 0 1
10 10 40.460482 38.401815 0 1
11 11 40.527021 40.460482 0 1
12 12 42.964496 40.527021 0 1
13 13 19.006525 42.964496 1 2
14 14 19.471200 19.006525 0 2

Get a report for each cluster

In [8]:
#A query to get aggregated results for each cluster
sql_case1_agg = """
    --Count the number of observations in each cluster
    SELECT {0}, COUNT({0}) AS rows_n
    FROM (
        {1}
    )
    GROUP BY {0}
""".format(c.col_cluster_id, sql_case1)

print(sql_case1_agg)
    --Count the number of observations in each cluster
    SELECT cluster_id, COUNT(cluster_id) AS rows_n
    FROM (
        
        --Cumulative sum to generate cluster id for each row
        SELECT *, SUM(is_new_cluster) OVER (ORDER BY time RANGE UNBOUNDED PRECEDING) AS cluster_id
        FROM(
            --Creates a column that indicates whether the row starts a new cluster
            SELECT *, CASE WHEN ((engine_heat - engine_heat_prev) < 0) OR (engine_heat_prev IS NULL) THEN TRUE ELSE 0 END AS is_new_cluster
            FROM(
                --Creates a column for previous engine heat value
                SELECT time, engine_heat, 
                    LAG(engine_heat, 1, null) OVER (ORDER BY time) AS engine_heat_prev
                FROM tbl_case1
            )
        )

    )
    GROUP BY cluster_id

In [9]:
#Print the report showing the observation count in each cluster
df_case1_clustered_agg = pd.read_sql(sql_case1_agg, con)
display(df_case1_clustered_agg)
cluster_id rows_n
0 1 13
1 2 12
2 3 8
3 4 22
4 5 8
5 6 8
6 7 1

Visualize the clusters detected by SQL

This visualization shows the dected clusters by our simple SQL algorithm.

If the chart looks the same than original visualization, the clustering was successful.

In [10]:
#Show the visualization
m.plot_heat_2d(df_case1_clustered, plot_title="Case 1: Clusters created by SQL")

Case 2: Clustering a multiple variables with more variation

Even the machine learning backed clustering algorithms struggle with unclear boundaries. Let's add some noise to the data and see how the SQL clustering manages this challenge. This is done by making the engine heat go randomly up or down.

In this example there will be two features for the clustering: engine heat 1 and engine heat 2. It's impossible to do the clustering by just a single variable. Instead, we know that if the combined heat increases, the observation belongs to the same cluster than the previous one.

In [11]:
#Generate data for case2 from the initial data
df_case2 = df_init.copy()
df_case2 = m.generate_engine_heat_2(df_case2, col_1="engine_heat_1", col_2="engine_heat_2")

Plot both engines in 2D

In [12]:
#Plot engine heat 1 and 2 by the time
m.plot_heat_2d(df_case2, col_y="engine_heat_1", plot_title="Case 2: Known clusters for engine 1")
m.plot_heat_2d(df_case2, col_y="engine_heat_2", plot_title="Case 2: Known clusters for engine 2")

Plot 3D

In [13]:
#Plot both engine heats by time in 3D plot
importlib.reload(m)
importlib.reload(c)
m.plot_heat_3d(df_case2, plot_title="Case 2: Both engine heats by time")

Write data to sqlite database table

In [14]:
#Write to sqlite database having only the time and heat columns
df_case2[[c.col_time, 'engine_heat_1', 'engine_heat_2']].to_sql(c.tbl_case2, con, if_exists="replace", index=False)

Cluster by using SQL

The most of the logic is similar compared to the simple case 1.

The biggest difference in case 2 is that a new cluster starts if the total heat increases.

In [15]:
#The query to get all rows labled with cluster id
sql_case2 = """
        --Cumulative sum to generate cluster id for each row
        SELECT *, SUM(is_new_cluster) OVER (ORDER BY {0} RANGE UNBOUNDED PRECEDING) AS cluster_id
        FROM(
            --Creates a column that indicates whether the row starts a new cluster
            SELECT *, CASE WHEN (tot_diff_prev < 0) OR (tot_diff_prev IS NULL) THEN TRUE ELSE 0 END AS is_new_cluster
            FROM(
                --Creates a column for difference between the current and the previous sum of engines heats 
                SELECT *, (engine_heat_1 - eh_1_prev) + (engine_heat_2 - eh_2_prev) AS tot_diff_prev
                FROM(
                    SELECT {0}, {1}, {2},
                        LAG({1}, 1, null) OVER (ORDER BY {0}) AS eh_1_prev,
                        LAG({2}, 1, null) OVER (ORDER BY {0}) AS eh_2_prev
                    FROM {3}
                )
            )
        )
""".format(c.col_time, "engine_heat_1", "engine_heat_2", c.tbl_case2)

print(sql_case2)
        --Cumulative sum to generate cluster id for each row
        SELECT *, SUM(is_new_cluster) OVER (ORDER BY time RANGE UNBOUNDED PRECEDING) AS cluster_id
        FROM(
            --Creates a column that indicates whether the row starts a new cluster
            SELECT *, CASE WHEN (tot_diff_prev < 0) OR (tot_diff_prev IS NULL) THEN TRUE ELSE 0 END AS is_new_cluster
            FROM(
                --Creates a column for difference between the current and the previous sum of engines heats 
                SELECT *, (engine_heat_1 - eh_1_prev) + (engine_heat_2 - eh_2_prev) AS tot_diff_prev
                FROM(
                    SELECT time, engine_heat_1, engine_heat_2,
                        LAG(engine_heat_1, 1, null) OVER (ORDER BY time) AS eh_1_prev,
                        LAG(engine_heat_2, 1, null) OVER (ORDER BY time) AS eh_2_prev
                    FROM tbl_case2
                )
            )
        )

In [16]:
#Use the SQL query to get the clusters for the case 2
df_case2_clustered = pd.read_sql(sql_case2, con)
df_case2_clustered.head(15)
Out[16]:
time engine_heat_1 engine_heat_2 eh_1_prev eh_2_prev tot_diff_prev is_new_cluster cluster_id
0 0 20.236250 19.813750 NaN NaN NaN 1 1
1 1 20.575750 20.137355 20.236250 19.813750 0.663105 0 1
2 2 21.234502 19.772704 20.575750 20.137355 0.294102 0 1
3 3 21.487683 19.780174 21.234502 19.772704 0.260650 0 1
4 4 21.776959 19.647813 21.487683 19.780174 0.156914 0 1
5 5 23.427995 19.992559 21.776959 19.647813 1.995783 0 1
6 6 24.593226 20.278267 23.427995 19.992559 1.450938 0 1
7 7 24.644050 20.301920 24.593226 20.278267 0.074477 0 1
8 8 23.914397 21.081573 24.644050 20.301920 0.050000 0 1
9 9 25.399968 20.713159 23.914397 21.081573 1.117157 0 1
10 10 25.288205 20.874922 25.399968 20.713159 0.050000 0 1
11 11 25.858017 20.488230 25.288205 20.874922 0.183119 0 1
12 12 25.583664 20.812583 25.858017 20.488230 0.050000 0 1
13 13 18.522026 18.579380 25.583664 20.812583 -9.294840 1 2
14 14 17.877624 19.273782 18.522026 18.579380 0.050000 0 2

Get a report for each cluster

In [17]:
#A query to get aggregated results for each cluster
sql_case2_agg = """
    --Count the number of observations in each cluster
    SELECT {0}, COUNT({0}) AS rows_n
    FROM (
        {1}
    )
    GROUP BY {0}
""".format(c.col_cluster_id, sql_case2)

print(sql_case2_agg)
    --Count the number of observations in each cluster
    SELECT cluster_id, COUNT(cluster_id) AS rows_n
    FROM (
        
        --Cumulative sum to generate cluster id for each row
        SELECT *, SUM(is_new_cluster) OVER (ORDER BY time RANGE UNBOUNDED PRECEDING) AS cluster_id
        FROM(
            --Creates a column that indicates whether the row starts a new cluster
            SELECT *, CASE WHEN (tot_diff_prev < 0) OR (tot_diff_prev IS NULL) THEN TRUE ELSE 0 END AS is_new_cluster
            FROM(
                --Creates a column for difference between the current and the previous sum of engines heats 
                SELECT *, (engine_heat_1 - eh_1_prev) + (engine_heat_2 - eh_2_prev) AS tot_diff_prev
                FROM(
                    SELECT time, engine_heat_1, engine_heat_2,
                        LAG(engine_heat_1, 1, null) OVER (ORDER BY time) AS eh_1_prev,
                        LAG(engine_heat_2, 1, null) OVER (ORDER BY time) AS eh_2_prev
                    FROM tbl_case2
                )
            )
        )

    )
    GROUP BY cluster_id

In [18]:
#Print the report showing the observation count in each cluster
df_case2_clustered_agg = pd.read_sql(sql_case2_agg, con)
display(df_case2_clustered_agg)
cluster_id rows_n
0 1 13
1 2 12
2 3 8
3 4 22
4 5 8
5 6 8
6 7 1

Visualize the clusters detected by SQL

In [19]:
m.plot_heat_2d(df_case2_clustered, col_y="engine_heat_1", plot_title="Case 2: Clusters created by SQL - Plot for engine 1")
m.plot_heat_2d(df_case2_clustered, col_y="engine_heat_2", plot_title="Case 2: Clusters created by SQL - Plot for engine 2")