#!/usr/bin/env python
# coding: utf-8
# # Guided Investigation - Anomaly Lookup
#
# __Notebook Version:__ 2.0
# __Python Version:__ Python 3.8 - AzureML
# __Platforms Supported:__ Azure Machine Learning Notebooks
#
# __Data Source Required:__ Log Analytics tables
#
# ### Description
# Gain insights into the possible root cause of an alert by searching for related anomalies on the corresponding entities around the alert’s time. This notebook will provide valuable leads for an alert’s investigation, listing all suspicious increase in event counts or their properties around the time of the alert, and linking to the corresponding raw records in Log Analytics for the investigator to focus on and interpret.
#
# You may need to select Python 3.8 - AzureML on Azure Machine Learning Notebooks.
#
# ## Table of Contents
#
# 1. Initialize Azure Resource Management Clients
# 2. Looking up for anomaly entities
# In[ ]:
get_ipython().system('pip install azure-monitor-query')
# In[ ]:
# Loading Python libraries
from azure.mgmt.loganalytics import LogAnalyticsManagementClient
from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus
from azure.identity import AzureCliCredential, DefaultAzureCredential
from datetime import timezone
import sys
import timeit
import datetime as dt
import pandas as pd
import copy
import base64
import json
from IPython.display import display, HTML, Markdown
from cryptography.fernet import Fernet
# The following cell has classes and functions for this notebook, code is hidden to unclutter the notebook. please RUN the cell, you may view the code by clicking 'input hidden'.
# In[ ]:
# Classes will be used in this notebook
class AnomalyQueries():
""" KQLs for anomaly lookup """
QUERIES = {}
QUERIES['LISTTABLES'] = b'dW5pb24gd2l0aHNvdXJjZSA9IFNlbnRpbmVsVGFibGVOYW1lICogfCBkaXN0aW5jdCBTZW50aW5lbFRhYmxlTmFtZSB8IHNvcnQgYnkgU2VudGluZWxUYWJsZU5hbWUgYXNjIA=='
QUERIES['ISCATCOLUMN'] = b'e3RhYmxlfSB8IGdldHNjaGVtYSB8IHdoZXJlIENvbHVtblR5cGUgaW4gKCdpbnQnLCAnbG9uZycsICdzdHJpbmcnKSB8IHByb2plY3QgQ29sdW1uTmFtZQ=='
QUERIES['ISCATHEURISTIC'] = b'e3RhYmxlfSB8IHdoZXJlIGluZ2VzdGlvbl90aW1lKCkgPiBhZ28oMWQpIHwgdGFrZSB0b2ludCgxZTgpIHwgc3VtbWFyaXplIGRjID0gZGNvdW50KHtjb2x1bW59KSwgY291bnQoKSB8IHdoZXJlIGRjPCAxMDAwIGFuZCBkYyA+IDEgfCBwcm9qZWN0IHJhdGlvID0gdG9kb3VibGUoZGMpIC8gY291bnRfIHwgd2hlcmUgcmF0aW88IDFlLTIg'
QUERIES['TIMESERIESANOMALYDETECTION'] = b'bGV0IGZ1bGxEYWlseUNvdW50ID0gbWF0ZXJpYWxpemUoIHt0YWJsZX0gfCBleHRlbmQgVGltZUNyZWF0ZWQgPSBUaW1lR2VuZXJhdGVkIHwgd2hlcmUgVGltZUNyZWF0ZWQgPiBkYXRldGltZSgne21pblRpbWVzdGFtcH0nKSBhbmQgVGltZUNyZWF0ZWQ8ZGF0ZXRpbWUoJ3ttYXhUaW1lc3RhbXB9JykgfCB3aGVyZSB7ZW50Q29sdW1ufSBoYXMgJ3txRW50aXR5fScgfCBtYWtlLXNlcmllcyBjb3VudCgpIGRlZmF1bHQgPSAwIG9uIFRpbWVDcmVhdGVkIGZyb20gZGF0ZXRpbWUoJ3ttaW5UaW1lc3RhbXB9JykgdG8gZGF0ZXRpbWUoJ3ttYXhUaW1lc3RhbXB9Jykgc3RlcCAxZCBieSB7Y29sdW1ufSk7IGZ1bGxEYWlseUNvdW50IHwgZXh0ZW5kKGFub21hbGllcywgYW5vbWFseVNjb3JlLCBleHBlY3RlZENvdW50KSA9IHNlcmllc19kZWNvbXBvc2VfYW5vbWFsaWVzKGNvdW50XywxLjUsLTEsJ2F2ZycsMSkgfCB3aGVyZSBhbm9tYWx5U2NvcmVbLTFdID4gMS41IHwgd2hlcmUgdG9pbnQoY291bnRfWy0xXSkgPiB0b2RvdWJsZShleHBlY3RlZENvdW50Wy0xXSkgfCBtdi1hcHBseSBhbm9tYWxpZXMgdG8gdHlwZW9mKGxvbmcpIG9uIChzdW1tYXJpemUgdG90QW5vbWFsaWVzPXN1bShhbm9tYWxpZXMpKSB8IHdoZXJlIHRvdEFub21hbGllcyA8IDUgfCBwcm9qZWN0IHFFbnRpdHkgPSAne3FFbnRpdHl9JywgcVRpbWVzdGFtcCA9IGRhdGV0aW1lKCd7cVRpbWVzdGFtcH0nKSwgbWluVGltZXN0YW1wID0gZGF0ZXRpbWUoJ3ttaW5UaW1lc3RhbXB9JyksIG1heFRpbWVzdGFtcCA9IGRhdGV0aW1lKCd7bWF4VGltZXN0YW1wfScpLCBkZWx0YSA9IHRvdGltZXNwYW4oe2RlbHRhfSksIFRhYmxlID0gJ3t0YWJsZX0nLCBlbnRDb2wgPSAne2VudENvbHVtbn0nLCBjb2xOYW1lID0gJ3tjb2x1bW59JywgY29sVmFsID0gdG9zdHJpbmcoe2NvbHVtbn0pLCBjb2xUeXBlID0gZ2V0dHlwZSh7Y29sdW1ufSksIGV4cGVjdGVkQ291bnQgPSBleHBlY3RlZENvdW50Wy0xXSwgYWN0dWFsQ291bnQgPSBjb3VudF9bLTFdLCBhbm9tYWx5U2NvcmUgPSBhbm9tYWx5U2NvcmVbLTFd'
QUERIES['TIMEWINDOWQUERY'] = b'bGV0IGluZERhdGUgPSB0b2RhdGV0aW1lKCd7cURhdGV9Jyk7IHt0YWJsZX0gfCBleHRlbmQgaW5nZXN0aW9uX3RpbWUoKSB8IHdoZXJlICRJbmdlc3Rpb25UaW1lID4gaW5kRGF0ZSArIHtmfXtkZWx0YX0gYW5kICRJbmdlc3Rpb25UaW1lPGluZERhdGUgKyB7dH17ZGVsdGF9IHwgd2hlcmUge2VudENvbHVtbn0gaGFzICd7cUVudGl0eX0nIHwgcHJvamVjdCBpbmcgPSRJbmdlc3Rpb25UaW1lIHwgdGFrZSAxIA=='
QUERIES['ISENTITYINTABLE'] = b'bGV0IGluZERhdGUgPSB0b2RhdGV0aW1lKCd7cURhdGV9Jyk7IHt0YWJsZX0gfCB3aGVyZSBpbmdlc3Rpb25fdGltZSgpIGJldHdlZW4oKGluZERhdGUgLTFoKSAuLiAoaW5kRGF0ZSArIDFoKSkgfCBzZWFyY2ggJ3txRW50aXR5fScgfCB0YWtlIDE='
@staticmethod
def get_query(name):
""" get KQL """
en_query = AnomalyQueries.QUERIES[name]
query = base64.b64decode(en_query).decode('utf=8')
return query
class AnomalyFinder():
"""
This class provides process flow functions for anomaly lookup.
Method - run is the main entry point.
"""
def __init__(self, workspace_id, la_data_client):
self.workspace_id = workspace_id
self.la_data_client = la_data_client
self.anomaly = ''
def query_table_list(self):
""" Get a list of data tables from Log Analytics for the user """
query = AnomalyQueries.get_query('LISTTABLES')
return self.query_loganalytics(query)
def query_loganalytics(self, query):
""" This method will call Log Analytics through LA client """
start_time = dt.datetime.now(timezone.utc) - dt.timedelta(30)
end_time=dt.datetime.now(timezone.utc)
result = self.la_data_client.query_workspace(
workspace_id=self.workspace_id,
query=query,
timespan=(start_time, end_time))
df = pd.DataFrame(data=result.tables[0].rows, columns=result.tables[0].columns)
return df
@staticmethod
def construct_related_queries(df_anomalies):
""" This method constructs query for user to repo and can be saves for future references """
if df_anomalies.shape[0] == 0:
return None
queries = ''
for tbl in df_anomalies.Table.unique():
cur_table_anomalies = df_anomalies.loc[df_anomalies.Table == tbl, :]
query = """{tbl} \
| where TimeGenerated > datetime({maxTimestamp})-14d and TimeGenerated < datetime({maxTimestamp}) \
| where {entCol} has "{qEntity}" \
| where """.format(**{
'tbl': tbl,
'qTimestamp': cur_table_anomalies.qTimestamp.iloc[0],
'maxTimestamp': cur_table_anomalies.maxTimestamp.iloc[0],
'entCol': cur_table_anomalies.entCol.iloc[0],
'qEntity': cur_table_anomalies.qEntity.iloc[0]
})
for j, row in cur_table_anomalies.iterrows(): # pylint: disable=unused-variable
query += " {col} == to{colType}(\"{colVal}\") or".format(
col=row.colName,
colType=(row.colType) if 'colType' in row.keys() else 'string',
colVal=row.colVal.replace('"', '')
)
query = query[:-2] # drop the last or
query += " | take 1000; " # limit the output size
query = query.replace("\\", "\\\\")
queries += query
return queries
def get_timewindow(self, q_entity, q_timestamp, ent_col, tbl):
""" find the relevant time window for analysis """
win_start = 0
min_timestamp = None
delta = None
max_timestamp = None
long_min_timestamp = None
time_window_query_template = AnomalyQueries.get_query('TIMEWINDOWQUERY')
for from_hour in range(-30, 0, 1):
kql_time_range_d = time_window_query_template.format(
table=tbl,
qDate=q_timestamp,
entColumn=ent_col,
qEntity=q_entity,
f=from_hour,
t=from_hour+1,
delta='d')
df_time_range = self.query_loganalytics(kql_time_range_d)
if df_time_range.shape[0] > 0:
win_start = from_hour
break
dt_q_timestamp = pd.to_datetime(q_timestamp)
ind2now = dt.datetime.utcnow() - dt_q_timestamp
if win_start < -3:
if ind2now > dt.timedelta(days=1):
delta = '1d'
max_timestamp = dt_q_timestamp + dt.timedelta(days=1)
else:
delta = '1d'
max_timestamp = dt.datetime.now()
long_min_timestamp = max_timestamp + dt.timedelta(days=win_start)
min_timestamp = max_timestamp + dt.timedelta(days=max([-6, win_start]))
elif win_start < 0: # switch to hours
win_start_hour = -5
for from_hour in range(-3*24, -5, 1):
kql_time_range_h = time_window_query_template.format(
table=tbl,
qDate=q_timestamp,
entColumn=ent_col,
qEntity=q_entity,
f=from_hour,
t=from_hour+1,
delta='h')
df_time_range = self.query_loganalytics(kql_time_range_h)
if df_time_range.shape[0] > 0:
win_start_hour = from_hour
break
if win_start_hour < -5:
if ind2now > dt.timedelta(hours=1):
delta = '1h'
max_timestamp = dt_q_timestamp + dt.timedelta(hours=1)
else:
delta = '1h'
max_timestamp = dt.datetime.now()
min_timestamp = max_timestamp + dt.timedelta(hours=win_start_hour)
long_min_timestamp = min_timestamp
return min_timestamp, delta, max_timestamp, long_min_timestamp
def run(self, q_timestamp, q_entity, tables):
""" Main function for Anomaly Lookup """
progress_bar = WidgetViewHelper.define_int_progress_bar()
display(progress_bar) # pylint: disable=undefined-variable
# list tables if not given
if not tables:
kql_list_tables = AnomalyQueries.get_query('LISTTABLES')
tables = self.query_loganalytics(kql_list_tables)
tables = tables.SentinelTableName.tolist()
progress_bar.value += 1
# find the column in which the query entity appears in each table
# - assumption that it appears in just one columns
tables2search = []
is_entity_in_table_template = AnomalyQueries.get_query('ISENTITYINTABLE')
for tbl in tables:
kql_entity_in_table = is_entity_in_table_template.format(
table=tbl,
qDate=q_timestamp,
qEntity=q_entity)
ent_in_table = self.query_loganalytics(kql_entity_in_table)
if ent_in_table.shape[0] > 0:
ent_col = [col for col in ent_in_table.select_dtypes('object').columns[1:] if
ent_in_table.loc[0, col] is not None
and ent_in_table.loc[:, col].str.contains(q_entity, case=False).all()]
if ent_col:
ent_col = ent_col[0]
tables2search.append({'table': tbl, 'entCol': ent_col})
progress_bar.value += 2
# for each table, find the time window to query on
for tbl in tables2search:
tbl['minTimestamp'], tbl['delta'], tbl['maxTimestamp'], tbl['longMinTimestamp'] = \
self.get_timewindow(q_entity, q_timestamp, tbl['entCol'], tbl['table'])
progress_bar.value += 1
# identify all the categorical columns per table on which we will find anomalies
categorical_cols = []
is_cat_column_template = AnomalyQueries.get_query('ISCATCOLUMN')
is_cat_heuristic_template = AnomalyQueries.get_query('ISCATHEURISTIC')
for tbl in tables2search:
kql_is_cat_column = is_cat_column_template.format(table=tbl['table'])
df_cols = self.query_loganalytics(kql_is_cat_column)
for col in df_cols.ColumnName:
kql_is_cat_heuristic = is_cat_heuristic_template.format(
table=tbl['table'],
column=col)
df_is_cat = self.query_loganalytics(kql_is_cat_heuristic)
if df_is_cat.shape[0] > 0:
cat_col_info = copy.deepcopy(tbl)
cat_col_info['col'] = col
categorical_cols.append(cat_col_info)
progress_bar.value += 2
anomalies_list = []
time_series_anomaly_detection_template = \
AnomalyQueries.get_query('TIMESERIESANOMALYDETECTION')
for col_info in categorical_cols:
max_timestamp = col_info['maxTimestamp'].strftime('%Y-%m-%dT%H:%M:%S.%f')
long_min_timestamp = col_info['longMinTimestamp'].strftime('%Y-%m-%dT%H:%M:%S.%f')
kql_time_series_anomaly_detection = time_series_anomaly_detection_template.format(
table=col_info['table'],
column=col_info['col'],
entColumn=col_info['entCol'],
qEntity=q_entity,
minTimestamp=long_min_timestamp,
maxTimestamp=max_timestamp,
qTimestamp=q_timestamp,
delta=col_info['delta'])
cur_anomalies = self.query_loganalytics(kql_time_series_anomaly_detection)
anomalies_list.append(cur_anomalies)
progress_bar.value += 2
if anomalies_list:
anomalies = pd.concat(anomalies_list, axis=0)
else:
anomalies = pd.DataFrame()
progress_bar.value += 2
queries = AnomalyFinder.construct_related_queries(anomalies)
progress_bar.close()
self.anomaly = str(anomalies.to_json(orient='records'))
return anomalies, queries
class WidgetViewHelper():
""" This classes provides helper methods for UI controls and components. """
def __init__(self):
self.variable = None
@staticmethod
def select_table(anomaly_lookup):
""" Select data tables """
table_list = anomaly_lookup.query_table_list()
tables = list(table_list["SentinelTableName"])
return widgets.Select(options=tables,
row=len(tables),
#value=[],
description='Tables:')
@staticmethod
def define_int_progress_bar():
""" define progress bar """
# pylint: disable=line-too-long
return IntProgress(value=0, min=0, max=10, step=1, description='Loading:', bar_style='success', orientation='horizontal', position='top')
@staticmethod
def define_int_progress_bar():
""" Define a progress bar """
return widgets.IntProgress(value=0,
min=0,
max=10,
step=1,
description='Loading:',
bar_style='success',
orientation='horizontal',
position='top')
@staticmethod
# pylint: disable=line-too-long
def copy_to_clipboard(url, text_body, label_text):
""" Copy text to Clipboard """
html_str = (
"""