#!/usr/bin/env python # coding: utf-8 # # Guided Investigation - Anomaly Lookup # # __Notebook Version:__ 2.0
# __Python Version:__ Python 3.8 - AzureML
# __Platforms Supported:__ Azure Machine Learning Notebooks # # __Data Source Required:__ Log Analytics tables # # ### Description # Gain insights into the possible root cause of an alert by searching for related anomalies on the corresponding entities around the alert’s time. This notebook will provide valuable leads for an alert’s investigation, listing all suspicious increase in event counts or their properties around the time of the alert, and linking to the corresponding raw records in Log Analytics for the investigator to focus on and interpret. # # You may need to select Python 3.8 - AzureML on Azure Machine Learning Notebooks. # # ## Table of Contents # # 1. Initialize Azure Resource Management Clients # 2. Looking up for anomaly entities # In[ ]: get_ipython().system('pip install azure-monitor-query') # In[ ]: # Loading Python libraries from azure.mgmt.loganalytics import LogAnalyticsManagementClient from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus from azure.identity import AzureCliCredential, DefaultAzureCredential from datetime import timezone import sys import timeit import datetime as dt import pandas as pd import copy import base64 import json from IPython.display import display, HTML, Markdown from cryptography.fernet import Fernet # The following cell has classes and functions for this notebook, code is hidden to unclutter the notebook. please RUN the cell, you may view the code by clicking 'input hidden'. # In[ ]: # Classes will be used in this notebook class AnomalyQueries(): """ KQLs for anomaly lookup """ QUERIES = {} QUERIES['LISTTABLES'] = b'dW5pb24gd2l0aHNvdXJjZSA9IFNlbnRpbmVsVGFibGVOYW1lICogfCBkaXN0aW5jdCBTZW50aW5lbFRhYmxlTmFtZSB8IHNvcnQgYnkgU2VudGluZWxUYWJsZU5hbWUgYXNjIA==' QUERIES['ISCATCOLUMN'] = b'e3RhYmxlfSB8IGdldHNjaGVtYSB8IHdoZXJlIENvbHVtblR5cGUgaW4gKCdpbnQnLCAnbG9uZycsICdzdHJpbmcnKSB8IHByb2plY3QgQ29sdW1uTmFtZQ==' QUERIES['ISCATHEURISTIC'] = b'e3RhYmxlfSB8IHdoZXJlIGluZ2VzdGlvbl90aW1lKCkgPiBhZ28oMWQpIHwgdGFrZSB0b2ludCgxZTgpIHwgc3VtbWFyaXplIGRjID0gZGNvdW50KHtjb2x1bW59KSwgY291bnQoKSB8IHdoZXJlIGRjPCAxMDAwIGFuZCBkYyA+IDEgfCBwcm9qZWN0IHJhdGlvID0gdG9kb3VibGUoZGMpIC8gY291bnRfIHwgd2hlcmUgcmF0aW88IDFlLTIg' QUERIES['TIMESERIESANOMALYDETECTION'] = b'bGV0IGZ1bGxEYWlseUNvdW50ID0gbWF0ZXJpYWxpemUoIHt0YWJsZX0gfCBleHRlbmQgVGltZUNyZWF0ZWQgPSBUaW1lR2VuZXJhdGVkIHwgd2hlcmUgVGltZUNyZWF0ZWQgPiBkYXRldGltZSgne21pblRpbWVzdGFtcH0nKSBhbmQgVGltZUNyZWF0ZWQ8ZGF0ZXRpbWUoJ3ttYXhUaW1lc3RhbXB9JykgfCB3aGVyZSB7ZW50Q29sdW1ufSBoYXMgJ3txRW50aXR5fScgfCBtYWtlLXNlcmllcyBjb3VudCgpIGRlZmF1bHQgPSAwIG9uIFRpbWVDcmVhdGVkIGZyb20gZGF0ZXRpbWUoJ3ttaW5UaW1lc3RhbXB9JykgdG8gZGF0ZXRpbWUoJ3ttYXhUaW1lc3RhbXB9Jykgc3RlcCAxZCBieSB7Y29sdW1ufSk7IGZ1bGxEYWlseUNvdW50IHwgZXh0ZW5kKGFub21hbGllcywgYW5vbWFseVNjb3JlLCBleHBlY3RlZENvdW50KSA9IHNlcmllc19kZWNvbXBvc2VfYW5vbWFsaWVzKGNvdW50XywxLjUsLTEsJ2F2ZycsMSkgfCB3aGVyZSBhbm9tYWx5U2NvcmVbLTFdID4gMS41IHwgd2hlcmUgdG9pbnQoY291bnRfWy0xXSkgPiB0b2RvdWJsZShleHBlY3RlZENvdW50Wy0xXSkgfCBtdi1hcHBseSBhbm9tYWxpZXMgdG8gdHlwZW9mKGxvbmcpIG9uIChzdW1tYXJpemUgdG90QW5vbWFsaWVzPXN1bShhbm9tYWxpZXMpKSB8IHdoZXJlIHRvdEFub21hbGllcyA8IDUgfCBwcm9qZWN0IHFFbnRpdHkgPSAne3FFbnRpdHl9JywgcVRpbWVzdGFtcCA9IGRhdGV0aW1lKCd7cVRpbWVzdGFtcH0nKSwgbWluVGltZXN0YW1wID0gZGF0ZXRpbWUoJ3ttaW5UaW1lc3RhbXB9JyksIG1heFRpbWVzdGFtcCA9IGRhdGV0aW1lKCd7bWF4VGltZXN0YW1wfScpLCBkZWx0YSA9IHRvdGltZXNwYW4oe2RlbHRhfSksIFRhYmxlID0gJ3t0YWJsZX0nLCBlbnRDb2wgPSAne2VudENvbHVtbn0nLCBjb2xOYW1lID0gJ3tjb2x1bW59JywgY29sVmFsID0gdG9zdHJpbmcoe2NvbHVtbn0pLCBjb2xUeXBlID0gZ2V0dHlwZSh7Y29sdW1ufSksIGV4cGVjdGVkQ291bnQgPSBleHBlY3RlZENvdW50Wy0xXSwgYWN0dWFsQ291bnQgPSBjb3VudF9bLTFdLCBhbm9tYWx5U2NvcmUgPSBhbm9tYWx5U2NvcmVbLTFd' QUERIES['TIMEWINDOWQUERY'] = b'bGV0IGluZERhdGUgPSB0b2RhdGV0aW1lKCd7cURhdGV9Jyk7IHt0YWJsZX0gfCBleHRlbmQgaW5nZXN0aW9uX3RpbWUoKSB8IHdoZXJlICRJbmdlc3Rpb25UaW1lID4gaW5kRGF0ZSArIHtmfXtkZWx0YX0gYW5kICRJbmdlc3Rpb25UaW1lPGluZERhdGUgKyB7dH17ZGVsdGF9IHwgd2hlcmUge2VudENvbHVtbn0gaGFzICd7cUVudGl0eX0nIHwgcHJvamVjdCBpbmcgPSRJbmdlc3Rpb25UaW1lIHwgdGFrZSAxIA==' QUERIES['ISENTITYINTABLE'] = b'bGV0IGluZERhdGUgPSB0b2RhdGV0aW1lKCd7cURhdGV9Jyk7IHt0YWJsZX0gfCB3aGVyZSBpbmdlc3Rpb25fdGltZSgpIGJldHdlZW4oKGluZERhdGUgLTFoKSAuLiAoaW5kRGF0ZSArIDFoKSkgfCBzZWFyY2ggJ3txRW50aXR5fScgfCB0YWtlIDE=' @staticmethod def get_query(name): """ get KQL """ en_query = AnomalyQueries.QUERIES[name] query = base64.b64decode(en_query).decode('utf=8') return query class AnomalyFinder(): """ This class provides process flow functions for anomaly lookup. Method - run is the main entry point. """ def __init__(self, workspace_id, la_data_client): self.workspace_id = workspace_id self.la_data_client = la_data_client self.anomaly = '' def query_table_list(self): """ Get a list of data tables from Log Analytics for the user """ query = AnomalyQueries.get_query('LISTTABLES') return self.query_loganalytics(query) def query_loganalytics(self, query): """ This method will call Log Analytics through LA client """ start_time = dt.datetime.now(timezone.utc) - dt.timedelta(30) end_time=dt.datetime.now(timezone.utc) result = self.la_data_client.query_workspace( workspace_id=self.workspace_id, query=query, timespan=(start_time, end_time)) df = pd.DataFrame(data=result.tables[0].rows, columns=result.tables[0].columns) return df @staticmethod def construct_related_queries(df_anomalies): """ This method constructs query for user to repo and can be saves for future references """ if df_anomalies.shape[0] == 0: return None queries = '' for tbl in df_anomalies.Table.unique(): cur_table_anomalies = df_anomalies.loc[df_anomalies.Table == tbl, :] query = """{tbl} \ | where TimeGenerated > datetime({maxTimestamp})-14d and TimeGenerated < datetime({maxTimestamp}) \ | where {entCol} has "{qEntity}" \ | where """.format(**{ 'tbl': tbl, 'qTimestamp': cur_table_anomalies.qTimestamp.iloc[0], 'maxTimestamp': cur_table_anomalies.maxTimestamp.iloc[0], 'entCol': cur_table_anomalies.entCol.iloc[0], 'qEntity': cur_table_anomalies.qEntity.iloc[0] }) for j, row in cur_table_anomalies.iterrows(): # pylint: disable=unused-variable query += " {col} == to{colType}(\"{colVal}\") or".format( col=row.colName, colType=(row.colType) if 'colType' in row.keys() else 'string', colVal=row.colVal.replace('"', '') ) query = query[:-2] # drop the last or query += " | take 1000; " # limit the output size query = query.replace("\\", "\\\\") queries += query return queries def get_timewindow(self, q_entity, q_timestamp, ent_col, tbl): """ find the relevant time window for analysis """ win_start = 0 min_timestamp = None delta = None max_timestamp = None long_min_timestamp = None time_window_query_template = AnomalyQueries.get_query('TIMEWINDOWQUERY') for from_hour in range(-30, 0, 1): kql_time_range_d = time_window_query_template.format( table=tbl, qDate=q_timestamp, entColumn=ent_col, qEntity=q_entity, f=from_hour, t=from_hour+1, delta='d') df_time_range = self.query_loganalytics(kql_time_range_d) if df_time_range.shape[0] > 0: win_start = from_hour break dt_q_timestamp = pd.to_datetime(q_timestamp) ind2now = dt.datetime.utcnow() - dt_q_timestamp if win_start < -3: if ind2now > dt.timedelta(days=1): delta = '1d' max_timestamp = dt_q_timestamp + dt.timedelta(days=1) else: delta = '1d' max_timestamp = dt.datetime.now() long_min_timestamp = max_timestamp + dt.timedelta(days=win_start) min_timestamp = max_timestamp + dt.timedelta(days=max([-6, win_start])) elif win_start < 0: # switch to hours win_start_hour = -5 for from_hour in range(-3*24, -5, 1): kql_time_range_h = time_window_query_template.format( table=tbl, qDate=q_timestamp, entColumn=ent_col, qEntity=q_entity, f=from_hour, t=from_hour+1, delta='h') df_time_range = self.query_loganalytics(kql_time_range_h) if df_time_range.shape[0] > 0: win_start_hour = from_hour break if win_start_hour < -5: if ind2now > dt.timedelta(hours=1): delta = '1h' max_timestamp = dt_q_timestamp + dt.timedelta(hours=1) else: delta = '1h' max_timestamp = dt.datetime.now() min_timestamp = max_timestamp + dt.timedelta(hours=win_start_hour) long_min_timestamp = min_timestamp return min_timestamp, delta, max_timestamp, long_min_timestamp def run(self, q_timestamp, q_entity, tables): """ Main function for Anomaly Lookup """ progress_bar = WidgetViewHelper.define_int_progress_bar() display(progress_bar) # pylint: disable=undefined-variable # list tables if not given if not tables: kql_list_tables = AnomalyQueries.get_query('LISTTABLES') tables = self.query_loganalytics(kql_list_tables) tables = tables.SentinelTableName.tolist() progress_bar.value += 1 # find the column in which the query entity appears in each table # - assumption that it appears in just one columns tables2search = [] is_entity_in_table_template = AnomalyQueries.get_query('ISENTITYINTABLE') for tbl in tables: kql_entity_in_table = is_entity_in_table_template.format( table=tbl, qDate=q_timestamp, qEntity=q_entity) ent_in_table = self.query_loganalytics(kql_entity_in_table) if ent_in_table.shape[0] > 0: ent_col = [col for col in ent_in_table.select_dtypes('object').columns[1:] if ent_in_table.loc[0, col] is not None and ent_in_table.loc[:, col].str.contains(q_entity, case=False).all()] if ent_col: ent_col = ent_col[0] tables2search.append({'table': tbl, 'entCol': ent_col}) progress_bar.value += 2 # for each table, find the time window to query on for tbl in tables2search: tbl['minTimestamp'], tbl['delta'], tbl['maxTimestamp'], tbl['longMinTimestamp'] = \ self.get_timewindow(q_entity, q_timestamp, tbl['entCol'], tbl['table']) progress_bar.value += 1 # identify all the categorical columns per table on which we will find anomalies categorical_cols = [] is_cat_column_template = AnomalyQueries.get_query('ISCATCOLUMN') is_cat_heuristic_template = AnomalyQueries.get_query('ISCATHEURISTIC') for tbl in tables2search: kql_is_cat_column = is_cat_column_template.format(table=tbl['table']) df_cols = self.query_loganalytics(kql_is_cat_column) for col in df_cols.ColumnName: kql_is_cat_heuristic = is_cat_heuristic_template.format( table=tbl['table'], column=col) df_is_cat = self.query_loganalytics(kql_is_cat_heuristic) if df_is_cat.shape[0] > 0: cat_col_info = copy.deepcopy(tbl) cat_col_info['col'] = col categorical_cols.append(cat_col_info) progress_bar.value += 2 anomalies_list = [] time_series_anomaly_detection_template = \ AnomalyQueries.get_query('TIMESERIESANOMALYDETECTION') for col_info in categorical_cols: max_timestamp = col_info['maxTimestamp'].strftime('%Y-%m-%dT%H:%M:%S.%f') long_min_timestamp = col_info['longMinTimestamp'].strftime('%Y-%m-%dT%H:%M:%S.%f') kql_time_series_anomaly_detection = time_series_anomaly_detection_template.format( table=col_info['table'], column=col_info['col'], entColumn=col_info['entCol'], qEntity=q_entity, minTimestamp=long_min_timestamp, maxTimestamp=max_timestamp, qTimestamp=q_timestamp, delta=col_info['delta']) cur_anomalies = self.query_loganalytics(kql_time_series_anomaly_detection) anomalies_list.append(cur_anomalies) progress_bar.value += 2 if anomalies_list: anomalies = pd.concat(anomalies_list, axis=0) else: anomalies = pd.DataFrame() progress_bar.value += 2 queries = AnomalyFinder.construct_related_queries(anomalies) progress_bar.close() self.anomaly = str(anomalies.to_json(orient='records')) return anomalies, queries class WidgetViewHelper(): """ This classes provides helper methods for UI controls and components. """ def __init__(self): self.variable = None @staticmethod def select_table(anomaly_lookup): """ Select data tables """ table_list = anomaly_lookup.query_table_list() tables = list(table_list["SentinelTableName"]) return widgets.Select(options=tables, row=len(tables), #value=[], description='Tables:') @staticmethod def define_int_progress_bar(): """ define progress bar """ # pylint: disable=line-too-long return IntProgress(value=0, min=0, max=10, step=1, description='Loading:', bar_style='success', orientation='horizontal', position='top') @staticmethod def define_int_progress_bar(): """ Define a progress bar """ return widgets.IntProgress(value=0, min=0, max=10, step=1, description='Loading:', bar_style='success', orientation='horizontal', position='top') @staticmethod # pylint: disable=line-too-long def copy_to_clipboard(url, text_body, label_text): """ Copy text to Clipboard """ html_str = ( """ """ ) return html_str @staticmethod def construct_url_for_log_analytics_logs(tenant_domain, subscription_id, resource_group, workspace_name): """ Generate URL for LA logs """ return 'https://portal.azure.com/#blade/Microsoft_Azure_Security_Insights/MainMenuBlade/7/subscriptionId/{0}/resourceGroup/{1}/workspaceName/{2}'.format(subscription_id, resource_group, workspace_name) @staticmethod # pylint: disable=undefined-variable def display_html(inner_html): """ Display HTML """ display(HTML(inner_html)) @staticmethod def pick_start_and_end_date(): """ Pick dates """ start_date = widgets.DatePicker(description='Pick a start date', disabled=False) end_date = widgets.DatePicker(description='Pick a end date', disabled=False) # pylint: disable=undefined-variable display(start_date) # pylint: disable=undefined-variable display(end_date) return start_date, end_date @staticmethod def select_multiple_items(label, item_name): """ Select multiple items """ label_item = widgets.Label(value=label) items = widgets.Textarea(value='', placeholder='One per line: \n 0x7ae3 \n 0x7ae6', description=item_name, disabled=False, rows=5) display(label_item) display(items) return items # Functions will be used in this notebook def read_config_values(file_path): "This loads pre-generated parameters for Sentinel Workspace" with open(file_path) as json_file: if json_file: json_config = json.load(json_file) return (json_config["tenant_id"], json_config["subscription_id"], json_config["resource_group"], json_config["workspace_id"], json_config["workspace_name"], json_config["user_alias"], json_config["user_object_id"]) return None def has_valid_token(): "Check to see if there is a valid AAD token" try: error = "Please run 'az login'" expired = "ERROR: AADSTS70043: The refresh token has expired or is invalid" failed = "failed" validator = get_ipython().getoutput('az account get-access-token') if any(expired in item for item in validator.get_list()): return '**The refresh token has expired.
Please continue your login process. Then:
1. If you plan to run multiple notebooks on the same compute instance today, you may restart the compute instance by clicking "Compute" on left menu, then select the instance, clicking "Restart";
2. Otherwise, you may just restart the kernel from top menu.
Finally, close and re-load the notebook, then re-run cells one by one from the top.**' elif any(error in item for item in validator.get_list()) or any(failed in item for item in validator.get_list()): return "Please run 'az login' to setup account" else: return None except: return "Please login" # In[ ]: # Calling the above function to populate Microsoft Sentinel workspace parameters # The file, config.json, was generated by the system, however, you may modify the values, or manually set the variables tenant_id, subscription_id, resource_group, workspace_id, workspace_name, user_alias, user_object_id = read_config_values('config.json'); # In[ ]: # Azure CLI is used to get device code to login into Azure, you need to copy the code and open the DeviceLogin site. # You may add [--tenant $tenant_id] to the command if has_valid_token() != None: get_ipython().system("echo -e '\\e[42m'") get_ipython().system('az login --tenant $tenant_id --use-device-code') resource_uri = "https://api.loganalytics.io" la_client = LogAnalyticsManagementClient(AzureCliCredential(), subscription_id = subscription_id) credential = DefaultAzureCredential() la_data_client = LogsQueryClient(credential) # In[ ]: # Entity inputs import ipywidgets as widgets #DateTime format: 2021-06-04T07:05:20.000 q_timestamp = widgets.Text(value='2022-08-04',description='DateTime: ') display(q_timestamp) #Entity format: user q_entity = widgets.Text(value='user',description='Entity: ') display(q_entity) # In[ ]: # Select tables import warnings warnings.simplefilter(action='ignore', category=FutureWarning) anomaly_lookup = AnomalyFinder(workspace_id, la_data_client) selected_table = WidgetViewHelper.select_table(anomaly_lookup) display(selected_table) # In[ ]: # Query data: this action may take a few minutes or more, please be patient. start = timeit.default_timer() anomalies, queries = anomaly_lookup.run(q_timestamp.value, q_entity.value, list([selected_table.value])) print('======= Task completed ===========') print('Elapsed time: ', timeit.default_timer() - start, ' seconds') if anomalies is not None: print(str(len(anomalies)) + ' records found.') else: print('0 records found.') # In[ ]: # Display query result in DataFrame if anomalies is not None and len(anomalies) > 0: pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option('display.width', None) sorted_anomalies = anomalies.sort_values(by=['anomalyScore'], ascending=False) display(sorted_anomalies) # In[ ]: # Save results to a csv file in the current file system if anomalies is not None and len(anomalies) > 0: anomalies.to_csv('anomaly_lookup.csv') # In[ ]: # ML Clustering based on anomalyScore if anomalies is not None and len(anomalies) > 10: import matplotlib.pyplot as plt from sklearn.cluster import KMeans anomaly_score_set = anomalies.iloc[:, [12]].copy() kmeans = KMeans(n_clusters=3).fit(anomaly_score_set) centroids = kmeans.cluster_centers_ print(centroids) # In[ ]: # Display Top anomaly scores if anomalies is not None and len(anomalies) > 10 and anomaly_score_set is not None: top_anomalies = anomaly_score_set.loc[anomaly_score_set['anomalyScore'] > "5"] print(top_anomalies) # In[ ]: # You also can go to Azure Log Analytics for further analysis if queries is not None: url = WidgetViewHelper.construct_url_for_log_analytics_logs(tenant_id, subscription_id, resource_group, workspace_name) print('======= Clicking the URL to go to Log Analytics =======') print(url) if len(queries) > 2000: print('======= Copy the queries to go to Log Analytics =======') print(queries) else: WidgetViewHelper.display_html(WidgetViewHelper.copy_to_clipboard(url, queries, 'Add queries to clipboard then paste to Logs'))