Notebook Version: 1.1
This notebook is intended for triage and investigation of security alerts related to process execution. It is specifically targeted at alerts triggered by suspicious process activity on Windows hosts.
Data Sources Used:
# Imports
import sys
import warnings
MIN_REQ_PYTHON = (3,6)
if sys.version_info < MIN_REQ_PYTHON:
print('Check the Kernel->Change Kernel menu and ensure that Python 3.6')
print('or later is selected as the active kernel.')
sys.exit("Python %s.%s or later is required.\n" % MIN_REQ_PYTHON)
import numpy as np
from IPython import get_ipython
from IPython.display import display, HTML, Markdown
import ipywidgets as widgets
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
sns.set()
import pandas as pd
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 50)
pd.set_option('display.max_colwidth', 100)
from msticpy.nbtools.utility import md, md_warn
from msticpy.nbtools import *
from msticpy.sectools import *
from msticpy.data.data_providers import QueryProvider
import msticpy.nbtools.kql as qry
import msticpy.nbtools.nbdisplay as nbdisp
# Some of our dependencies (networkx) still use deprecated Matplotlib
# APIs - we can't do anything about it so suppress them from view
from matplotlib import MatplotlibDeprecationWarning
warnings.simplefilter("ignore", category=MatplotlibDeprecationWarning)
Use the following syntax if you are authenticating using an Azure Active Directory AppId and Secret:
%kql loganalytics://tenant(aad_tenant).workspace(WORKSPACE_ID).clientid(client_id).clientsecret(client_secret)
instead of
%kql loganalytics://code().workspace(WORKSPACE_ID)
Note: you may occasionally see a JavaScript error displayed at the end of the authentication - you can safely ignore this.
On successful authentication you should see a popup schema
button.
To find your Workspace Id go to Log Analytics. Look at the workspace properties to find the ID.
#See if we have an Azure Sentinel Workspace defined in our config file, if not let the user specify Workspace and Tenant IDs
from msticpy.nbtools.wsconfig import WorkspaceConfig
ws_config = WorkspaceConfig()
try:
ws_id = ws_config['workspace_id']
ten_id = ws_config['tenant_id']
display(HTML("Workspace details collected from config file"))
config = True
except:
display(HTML('Please go to your Log Analytics workspace, copy the workspace ID'
' and/or tenant Id and paste here to enable connection to the workspace and querying of it..<br> '))
ws_id = mnbwidgets.GetEnvironmentKey(env_var='WORKSPACE_ID',
prompt='Please enter your Log Analytics Workspace Id:', auto_display=True)
ten_id = nbwidgets.GetEnvironmentKey(env_var='TENANT_ID',
prompt='Please enter your Log Analytics Tenant Id:', auto_display=True)
config = False
# Establish a query provider for Azure Sentinel and connect to it
if config is False:
ws_id = ws_id.value
ten_id = ten_id.value
qry_prov = QueryProvider('LogAnalytics')
la_connection_string = f'loganalytics://code().tenant("{ten_id}").workspace("{ws_id}")'
qry_prov.connect(connection_str=f'{la_connection_string}')
We are using an alert as the starting point for this investigation, specify a time range to search for alerts. Once this is set run the following cell to retrieve any alerts in that time window. You can change the time range and re-run the queries until you find the alerts that you want to investigate.
alert_q_times = nbwidgets.QueryTime(units='hour',
max_before=20, max_after=1, before=3)
alert_q_times.display()
alert_list = qry_prov.SecurityAlert.list_alerts(
alert_q_times)
alert_counts = qry_prov.SecurityAlert.list_alerts_counts(
alert_q_times)
if isinstance(alert_list, pd.DataFrame) and not alert_list.empty:
print(len(alert_counts), ' distinct alert types')
print(len(alert_list), ' distinct alerts')
# Display alerts on timeline to aid in visual grouping
nbdisplay.display_timeline(
data=alert_list, source_columns=["AlertName", 'CompromisedEntity'], title="Alerts over time", height=300, color="red")
display(alert_counts.head(10)) # remove '.head(10)'' to see the full list grouped by AlertName
else:
display(Markdown('No related alerts found.'))
To focus the investigation select an alert from a list of retrieved alerts.
As you select an alert, the main properties will be shown below the list.
Use the filter box to narrow down your search to any substring in the AlertName.
get_alert = None
alert_select = nbwidgets.AlertSelector(alerts=alert_list, action=nbdisp.display_alert)
alert_select.display()
In order to pivot to data related to the selected security alert we need to identify key data points in the selected alert. This section extracts the alert information and entities into a SecurityAlert object allowing us to query the properties more reliably.
Properties in this object will be used to automatically provide parameters for queries and UI elements. Subsequent queries will use properties like the host name and derived properties such as the OS family (Linux or Windows) to adapt the query. Query time selectors like the one above will also default to an origin time that matches the alert selected.
The alert view below shows all of the main properties of the alert plus the extended property dictionary (if any) and JSON representations of the Entity.
# Extract entities and properties into a SecurityAlert class
if alert_select is None or alert_select.selected_alert is None:
raise ValueError("Please select an alert before executing remaining cells.")
else:
security_alert = SecurityAlert(alert_select.selected_alert)
nbdisplay.display_alert(security_alert, show_entities=True)
Depending on the type of alert there may be one or more entities attached as properties. Entities are key indicators that we can pivot on during our investigation, such as Host, Account, IpAddress, Process, etc. - essentially the 'nouns' of security investigation. Entities are often related to other entities - for example a process will usually have a related file entity (the process image) and an Account entity (the context in which the process was running). Endpoint alerts typically always have a host entity (which could be a physical or virtual machine). In order to more effectively understand the links between related entities we can plot them as a graph.
# Draw the graph using Networkx/Matplotlib
%matplotlib inline
alertentity_graph = security_alert_graph.create_alert_graph(security_alert)
nbdisp.draw_alert_entity_graph(alertentity_graph, width=15)
For certain entities in the alert we can search for other alerts that have that entity in common. Currently this pivot supports alerts with the same Host, Account or Process.
Notes:
In order to more effectively identify related alerts the query time boundaries can be adjusted to encompass a longer time frame.
# set the origin time to the time of our alert
query_times = nbwidgets.QueryTime(units='day', origin_time=security_alert.TimeGenerated,
max_before=28, max_after=1, before=5)
query_times.display()
if not security_alert.primary_host:
print('Related alerts is not yet supported for alerts that are not host-based')
related_alerts = None
else:
related_alerts = qry_prov.SecurityAlert.list_related_alerts(query_times, security_alert)
if related_alerts is not None and not related_alerts.empty:
host_alert_items = related_alerts\
.query('host_match == @True')[['AlertType', 'StartTimeUtc']]\
.groupby('AlertType').StartTimeUtc.agg('count').to_dict()
acct_alert_items = related_alerts\
.query('acct_match == @True')[['AlertType', 'StartTimeUtc']]\
.groupby('AlertType').StartTimeUtc.agg('count').to_dict()
proc_alert_items = related_alerts\
.query('proc_match == @True')[['AlertType', 'StartTimeUtc']]\
.groupby('AlertType').StartTimeUtc.agg('count').to_dict()
def print_related_alerts(alertDict, entityType, entityName):
if len(alertDict) > 0:
print('Found {} different alert types related to this {} (\'{}\')'
.format(len(alertDict), entityType, entityName))
for (k,v) in alertDict.items():
print(' {}, Count of alerts: {}'.format(k, v))
else:
print('No alerts for {} entity \'{}\''.format(entityType, entityName))
print_related_alerts(host_alert_items, 'host', security_alert.hostname)
print_related_alerts(acct_alert_items, 'account',
security_alert.primary_account.qualified_name
if security_alert.primary_account
else None)
print_related_alerts(proc_alert_items, 'process',
security_alert.primary_process.ProcessFilePath
if security_alert.primary_process
else None)
nbdisp.display_timeline(data=related_alerts, source_columns = ['AlertName'], title='Alerts', height=100)
else:
display(Markdown('No related alerts found.'))
To see the how these alerts relate to our original alert, and how these new alerts relate to each other we can graph them.
# Draw a graph of this (add to entity graph)
%matplotlib notebook
%matplotlib inline
if related_alerts is not None and not related_alerts.empty:
rel_alert_graph = mas.add_related_alerts(related_alerts=related_alerts,
alertgraph=alertentity_graph)
nbdisp.draw_alert_entity_graph(rel_alert_graph, width=15)
else:
display(Markdown('No related alerts found.'))
Once we have understood how these alerts related to each other, we can view the details of each new, related alert.
def disp_full_alert(alert):
global related_alert
related_alert = SecurityAlert(alert)
nbdisplay.display_alert(related_alert, show_entities=True)
if related_alerts is not None and not related_alerts.empty:
related_alerts['CompromisedEntity'] = related_alerts['Computer']
print('Selected alert is available as \'related_alert\' variable.')
rel_alert_select = nbwidgets.AlertSelector(alerts=related_alerts, action=disp_full_alert)
rel_alert_select.display()
else:
display(Markdown('No related alerts found.'))
If the alert has a process entity this section tries to retrieve the entire process tree to which that process belongs.
Notes:
The source (alert) process is shown in red.
What's shown for each process:
# set the origin time to the time of our alert
query_times = nbwidgets.QueryTime(units='minute', origin_time=security_alert.origin_time)
query_times.display()
from msticpy.nbtools.query_defns import DataFamily
if security_alert.data_family != DataFamily.WindowsSecurity:
raise ValueError('The remainder of this notebook currently only supports Windows. '
'Linux support is in development but not yet implemented.')
def extract_missing_pid(security_alert):
for pid_ext_name in ['Process Id', 'Suspicious Process Id']:
pid = security_alert.ExtendedProperties.get(pid_ext_name, None)
if pid:
return pid
def extract_missing_sess_id(security_alert):
sess_id = security_alert.ExtendedProperties.get('Account Session Id', None)
if sess_id:
return sess_id
for session in [e for e in security_alert.entities if
e['Type'] == 'host-logon-session' or e['Type'] == 'hostlogonsession']:
return session['SessionId']
if (security_alert.primary_process):
# Do some patching up if the process entity doesn't have a PID
pid = security_alert.primary_process.ProcessId
if not pid:
pid = extract_missing_pid(security_alert)
if pid:
security_alert.primary_process.ProcessId = pid
else:
raise ValueError('Could not find the process Id for the alert process.')
# Do the same if we can't find the account logon ID
if not security_alert.get_logon_id():
sess_id = extract_missing_sess_id(security_alert)
if sess_id and security_alert.primary_account:
security_alert.primary_account.LogonId = sess_id
else:
raise ValueError('Could not find the session Id for the alert process.')
# run the query
process_tree = qry_prov.WindowsSecurity.get_process_tree(query_times, security_alert)
if len(process_tree) > 0:
# Print out the text view of the process tree
nbdisplay.display_process_tree(process_tree)
else:
display(Markdown('No processes were returned so cannot obtain a process tree.'
'\n\nSkip to [Other Processes](#process_clustering) later in the'
' notebook to retrieve all processes'))
else:
display(Markdown('This alert has no process entity so cannot obtain a process tree.'
'\n\nSkip to [Other Processes](#process_clustering) later in the'
' notebook to retrieve all processes'))
process_tree = None
As well as seeing the processes involved in a tree we want to see the chronology of this process execution. This shows each process in the process tree on a time line view. If a large number of processes are involved in this process tree it may take some time to display this time line graphic.
# Show timeline of events
if process_tree is not None and not process_tree.empty:
nbdisplay.display_timeline(data=process_tree, alert=security_alert,
title='Alert Process Session', height=250)
Sometimes you don't have a source process from which to build our investigation. Other times it's just useful to see what other process activity is occurring on the host. This section retrieves all processes on the host within the time bounds set in the query times widget.
If you want to view the raw details of this process data display the processes_on_host dataframe.
In order to more effectively analyze this process data we can cluster processes into distinct process clusters. To do this we process the raw event list output to extract a few features that render strings (such as commandline)into numerical values. The default below uses the following features:
Then we run a clustering algorithm (DBScan in this case) on the process list. The result groups similar (noisy) processes together and leaves unique process patterns as single-member clusters.
from msticpy.sectools.eventcluster import dbcluster_events, add_process_features
processes_on_host = None
if security_alert.primary_host:
processes_on_host = qry_prov.WindowsSecurity.list_processes_in_session(query_times, security_alert)
if processes_on_host is not None and not processes_on_host.empty:
feature_procs = add_process_features(input_frame=processes_on_host,
path_separator=security_alert.path_separator)
# you might need to play around with the max_cluster_distance parameter.
# decreasing this gives more clusters.
(clus_events, dbcluster, x_data) = dbcluster_events(data=feature_procs,
cluster_columns=['commandlineTokensFull',
'pathScore',
'isSystemSession'],
max_cluster_distance=0.0001)
print('Number of input events:', len(feature_procs))
print('Number of clustered events:', len(clus_events))
clus_events[['ClusterSize', 'processName']][clus_events['ClusterSize'] > 1].plot.bar(x='processName',
title='Process names with Cluster > 1',
figsize=(12,3));
if processes_on_host is None or processes_on_host.empty:
display(Markdown('Unable to obtain any processes for this host. This feature'
' is currently only supported for Windows hosts.'
'\n\nIf this is a Windows host skip to [Host Logons](#host_logons)'
' later in the notebook to examine logon events.'))
In this section we display a number of charts highlighting the variability of command lines and processes paths associated with each process.
The top chart shows the variability of command line content for a given process name. The wider the box, the more instances were found with different command line structure. For certain processes such as cmd.exe or powershell.exe a wide variability in command lines could be expected, however with other processes this could be considered abnormal.
Note, the 'structure' in this case is measured by the number of tokens or delimiters in the command line and does not look at content differences. This is done so that commonly varying instances of the same command line are grouped together.
For example updatepatch host1.mydom.com
and updatepatch host2.mydom.com
will be grouped together.
The second graph shows processes by variation in the full path associated with the process. This does compare content so c:\windows\system32\net.exe
and e:\windows\system32\net.exe
are treated as distinct. You would normally not expect to see any variability in this chart unless you have multiple copies of the same name executable or an executable is trying masquerade as another well-known binary.
# Looking at the variability of commandlines and process image paths
import seaborn as sns
sns.set(style="darkgrid")
if processes_on_host is not None and not processes_on_host.empty:
proc_plot = sns.catplot(y="processName", x="commandlineTokensFull",
data=feature_procs.sort_values('processName'),
kind='box', height=10)
proc_plot.fig.suptitle('Variability of Commandline Tokens', x=1, y=1)
proc_plot = sns.catplot(y="processName", x="pathLogScore",
data=feature_procs.sort_values('processName'),
kind='box', height=10, hue='isSystemSession')
proc_plot.fig.suptitle('Variability of Path', x=1, y=1);
if 'clus_events' in locals() and not clus_events.empty:
resp = input('View the clustered data? y/n')
if resp == 'y':
display(clus_events.sort_values('TimeGenerated')[['TimeGenerated', 'LastEventTime',
'NewProcessName', 'CommandLine',
'ClusterSize', 'commandlineTokensFull',
'pathScore', 'isSystemSession']])
# Look at clusters for individual process names
def view_cluster(exe_name):
display(clus_events[['ClusterSize', 'processName', 'CommandLine', 'ClusterId']][clus_events['processName'] == exe_name])
display(Markdown('You can view the cluster members for individual processes'
'by inserting a new cell and entering:<br>'
'`>>> view_cluster(process_name)`<br></div>'
'where process_name is the unqualified process binary. E.g<br>'
'`>>> view_cluster(\'reg.exe\')`'))
# Show timeline of events - clustered events
if 'clus_events' in locals() and not clus_events.empty:
nbdisp.display_timeline(data=clus_events,
overlay_data=processes_on_host,
alert=security_alert,
title='Distinct Host Processes (bottom) and All Proceses (top)')
This section looks for Indicators of Compromise (IoC) within the data sets passed to it.
The first section looks at the command line for the process related to our original alert (if any). It also looks for Base64 encoded strings within the data - this is a common way of hiding attacker intent. It attempts to decode any strings that look like Base64. Additionally, if the Base64 decode operation returns any items that look like a Base64 encoded string or file, a gzipped binary sequence, a zipped or tar archive, it will attempt to extract the contents before searching for potentially interesting items.
process = security_alert.primary_process
ioc_extractor = IoCExtract()
if process:
# if nothing is decoded this just returns the input string unchanged
base64_dec_str, _ = base64.unpack_items(input_string=process["CommandLine"])
if base64_dec_str and '<decoded' in base64_dec_str:
print('Base64 encoded items found.')
print(base64_dec_str)
# any IoCs in the string?
iocs_found = ioc_extractor.extract(base64_dec_str)
if iocs_found:
print('\nPotential IoCs found in alert process:')
display(iocs_found)
else:
print('Nothing to process')
If we have a process tree or other elements that contain command lines we also want to attempt to extract IoCs from these data sets.
ioc_extractor = IoCExtract()
source_processes = None
# if process tree is populated we use that preferentially
try:
if not process_tree.empty:
source_processes = process_tree
except (NameError, AttributeError):
pass
# If not, use the clustered events from the all sessions
try:
if source_processes is None and not clus_events.empty:
source_processes = clus_events
except (NameError, AttributeError):
pass
if source_processes is not None and not source_processes.empty:
ioc_df = ioc_extractor.extract(data=source_processes,
columns=['CommandLine'],
os_family=security_alert.os_family,
ioc_types=['ipv4', 'ipv6', 'dns', 'url',
'md5_hash', 'sha1_hash', 'sha256_hash'])
if len(ioc_df):
display(HTML("<h3>IoC patterns found in process tree.</h3>"))
display(ioc_df)
else:
ioc_df = None
For simple strings the Base64 decoded output is straightforward. However it is not uncommon to see nested encodings therefore we want to try to extract and decode these nested elements as well.
if source_processes is not None:
dec_df = base64.unpack_items(data=source_processes, column='CommandLine')
if source_processes is not None and (dec_df is not None and not dec_df.empty):
display(HTML("<h3>Decoded base 64 command lines</h3>"))
display(HTML("Warning - some binary patterns may be decodable as unicode strings"))
display(dec_df[['full_decoded_string', 'original_string', 'decoded_string', 'input_bytes', 'file_hashes']])
ioc_dec_df = ioc_extractor.extract(data=dec_df, columns=['full_decoded_string'])
if len(ioc_dec_df):
display(HTML("<h3>IoC patterns found in base 64 decoded data</h3>"))
display(ioc_dec_df)
if ioc_df is not None:
ioc_df = ioc_df.append(ioc_dec_df ,ignore_index=True)
else:
ioc_df = ioc_dec_df
else:
print("No base64 encodings found.")
Now that we have identified a number of IoCs we want to check to see if they are associated with known mallicious activity. To do this we will query three different Threat Intelligence providers to see if we get results.
We will be using:
If you do not have an API key for any of these providers simply remove their name from the providers list in our lookup_iocs command.
tilookups = TILookup()
if ioc_df is not None and not ioc_df.empty:
ti_results = tilookups.lookup_iocs(data=ioc_df, obs_col='Observable', ioc_type_col='IoCType', providers=["OTX", "VirusTotal", "XForce"])
if not ti_results[ti_results['Severity'] > 0].empty:
md("Positive TI Results:", "bold")
display(ti_results[ti_results['Severity'] > 0])
else:
md("No postive matches found in threat intelligence")
else:
md("No IOCs to lookup")
Understanding where else a command line is being run in an environment can give us a good idea of the scope of a security incident, or help us determine whether activity is malicious or expected.
To get a sense of whether the alert process is something that is occuring on other hosts, run this section.
# set the origin time to the time of our alert
query_times = nbwidgets.QueryTime(units='day', before=5, max_before=20,
after=1, max_after=10,
origin_time=security_alert.origin_time)
query_times.display()
# This query needs a commandline parameter which isn't supplied
# by default from the the alert
# - so extract and escape this from the process
if not security_alert.primary_process:
raise ValueError('This alert has no process entity. This section is not applicable.')
proc_match_in_ws = None
commandline = security_alert.primary_process.CommandLine
commandline = utility.escape_windows_path(commandline)
commandline = commandline.replace('"',"'")
process = security_alert.ExtendedProperties['process name']
process = utility.escape_windows_path(process)
process = process.replace('"',"'")
md(f"Command Line: {commandline}")
if commandline.strip():
proc_match_in_ws = qry_prov.WindowsSecurity.list_hosts_matching_commandline(start=query_times.start, end=query_times.end, process_name=process,
commandline=commandline)
else:
md('process has empty commandline')
# Check the results
if proc_match_in_ws is None or proc_match_in_ws.empty:
md('No proceses with matching commandline found in on other hosts in workspace')
md(f'between, {query_times.start}, and, {query_times.end}')
else:
hosts = proc_match_in_ws['Computer'].drop_duplicates().shape[0]
processes = proc_match_in_ws.shape[0]
md('{numprocesses} proceses with matching commandline found on {numhosts} hosts in workspace'\
.format(numprocesses=processes, numhosts=hosts))
md('between', query_times.start, 'and', query_times.end)
md('To examine these execute the dataframe \'{}\' in a new cell'.format('proc_match_in_ws'))
md(proc_match_in_ws[['TimeCreatedUtc','Computer', 'NewProcessName', 'CommandLine']].head())
If at this point you wish to investigate a particular host in detail you can use the cells below or you can switch to our Host Investigation Notebooks that provide a deep dive capability for Windows and Linux hosts.
This section retrieves the logon events on the host in the alert.
You may want to use the query times to search over a broader range than the default.
# set the origin time to the time of our alert
query_times = nbwidgets.QueryTime(units='day', origin_time=security_alert.origin_time,
before=1, after=0, max_before=20, max_after=1)
query_times.display()
If you wish to investigate a specific host in detail you can use the cells below or switch to our Account investigation notebook.
This returns the account associated with the alert being investigated.
logon_id = security_alert.get_logon_id()
if logon_id:
if logon_id in ['0x3e7', '0X3E7', '-1', -1]:
print('Cannot retrieve single logon event for system logon id '
'- please continue with All Host Logons below.')
else:
logon_event = qry.get_host_logon(provs=[query_times, security_alert])
nbdisp.display_logon_data(logon_event, security_alert)
else:
print('No account entity in the source alert or the primary account had no logonId value set.')
Since the number of logon events may be large and, in the case of system logons, very repetitive, we use clustering to try to identity logons with unique characteristics.
In this case we use the numeric score of the account name and the logon type (i.e. interactive, service, etc.). The results of the clustered logons are shown below along with a more detailed, readable printout of the logon event information. The data here will vary depending on whether this is a Windows or Linux host.
from msticpy.sectools.eventcluster import dbcluster_events, add_process_features, _string_score
if security_alert.primary_host:
host_logons = qry_prov.WindowsSecurity.list_host_logons(query_times, security_alert)
else:
host_logons = None
md("No data available - alert has no host entity.")
if host_logons is not None and not host_logons.empty:
logon_features = host_logons.copy()
logon_features['AccountNum'] = host_logons.apply(lambda x: _string_score(x.Account), axis=1)
logon_features['LogonHour'] = host_logons.apply(lambda x: x.TimeGenerated.hour, axis=1)
# you might need to play around with the max_cluster_distance parameter.
# decreasing this gives more clusters.
(clus_logons, _, _) = dbcluster_events(data=logon_features, time_column='TimeGenerated',
cluster_columns=['AccountNum',
'LogonType'],
max_cluster_distance=0.0001)
md('Number of input events:', len(host_logons))
md('Number of clustered events:', len(clus_logons))
md('\nDistinct host logon patterns:')
display(clus_logons.sort_values('TimeGenerated'))
else:
md('No logon events found for host.')
# Display logon details
if host_logons is not None:
nbdisp.display_logon_data(clus_logons, security_alert)
To understand these logons in relation to the original alert we are investigating we want to view them in a time line.
# Show timeline of events - all logons + clustered logons
if host_logons is not None and not host_logons.empty:
nbdisp.display_timeline(data=host_logons, overlay_data=clus_logons,
alert=security_alert,
source_columns=['Account', 'LogonType'],
title='All Host Logons')
This shows the timeline of the clustered logon events with the process tree obtained earlier. This allows you to get a sense of which logon was responsible for the process tree session whether any additional logons (e.g. creating a process as another user) might be associated with the alert timeline.
Note you should use the pan and zoom tools to align the timelines since the data may be over different time ranges.
display(clus_logons.head())
process_tree.head()
# Show timeline of events - all events
if host_logons is not None and not host_logons.empty:
nbdisplay.display_timeline(data=clus_logons, overlay_data=process_tree, source_columns=['Account'],
alert=security_alert,
title='Clustered Host Logons', height=200)
# Counts of Logon types by Account
if host_logons is not None and not host_logons.empty:
display(host_logons[['Account', 'LogonType', 'TimeGenerated']]
.groupby(['Account','LogonType']).count()
.rename(columns={'TimeGenerated': 'LogonCount'}))
Failed logons can provide a valuable source of data for investigation so we also want to look at failed logons during the period of our investigation.
if security_alert.primary_host:
failedLogons = qry_prov.WindowsSecurity.list_host_logon_failures(query_times, security_alert)
else:
md("No data available - alert has no host entity.")
failedLogons = None
if failedLogons is not None and not failedLogons.empty:
md(f'No logon failures recorded for this host between {security_alert.StartTimeUtc} and {security_alert.EndTimeUtc}')
failedLogons
print('List of current DataFrames in Notebook')
print('-' * 50)
current_vars = list(locals().keys())
for var_name in current_vars:
if isinstance(locals()[var_name], pd.DataFrame) and not var_name.startswith('_'):
print(var_name)
To save the contents of a pandas DataFrame to an CSV use the following syntax
host_logons.to_csv('host_logons.csv')
To save the contents of a pandas DataFrame to an Excel spreadsheet use the following syntax
writer = pd.ExcelWriter('myWorksheet.xlsx')
my_data_frame.to_excel(writer,'Sheet1')
writer.save()
If you have not run this Notebook before please run this cell before running the rest of the Notebook.
import sys
import warnings
warnings.filterwarnings("ignore",category=DeprecationWarning)
MIN_REQ_PYTHON = (3,6)
if sys.version_info < MIN_REQ_PYTHON:
print('Check the Kernel->Change Kernel menu and ensure that Python 3.6')
print('or later is selected as the active kernel.')
sys.exit("Python %s.%s or later is required.\n" % MIN_REQ_PYTHON)
# Package Installs - try to avoid if they are already installed
try:
import msticpy.sectools as sectools
import Kqlmagic
print('If you answer "n" this cell will exit with an error in order to avoid the pip install calls,')
print('This error can safely be ignored.')
resp = input('msticpy and Kqlmagic packages are already loaded. Do you want to re-install? (y/n)')
if resp.strip().lower() != 'y':
sys.exit('pip install aborted - you may skip this error and continue.')
else:
print('After installation has completed, restart the current kernel and run '
'the notebook again skipping this cell.')
except ImportError:
pass
print('\nPlease wait. Installing required packages. This may take a few minutes...')
!pip install git+https://github.com/microsoft/msticpy --upgrade --user
!pip install Kqlmagic --no-cache-dir --upgrade --user
print('\nTo ensure that the latest versions of the installed libraries '
'are used, please restart the current kernel and run '
'the notebook again skipping this cell.')