<details>
Notebook Version: 1.0
Python Version: Python 3.6 (including Python 3.6 - AzureML)
Required Packages: kqlmagic, msticpy, pandas, numpy, matplotlib, bokeh, networkx, ipywidgets, ipython, scikit_learn, dnspython, ipwhois, folium, maxminddb_geolite2
Platforms Supported:
Data Sources Required:
</details>
Brings together a series of queries and visualizations to help you determine the security state of the Windows host or virtual machine that you are investigating.
The next cell:
This should complete without errors. If you encounter errors or warnings look at the following two notebooks:
If you are running in the Azure Sentinel Notebooks environment (Azure Notebooks or Azure ML) you can run live versions of these notebooks:
You may also need to do some additional configuration to successfully use functions such as Threat Intelligence service lookup and Geo IP lookup.
There are more details about this in the ConfiguringNotebookEnvironment
notebook and in these documents:
from pathlib import Path
import os
import sys
import warnings
from IPython.display import display, HTML, Markdown
REQ_PYTHON_VER=(3, 6)
REQ_MSTICPY_VER=(0, 5, 0)
display(HTML("<h3>Starting Notebook setup...</h3>"))
if Path("./utils/nb_check.py").is_file():
from utils.nb_check import check_python_ver, check_mp_ver
check_python_ver(min_py_ver=REQ_PYTHON_VER)
try:
check_mp_ver(min_msticpy_ver=REQ_MSTICPY_VER)
except ImportError:
!pip install --upgrade msticpy
if "msticpy" in sys.modules:
importlib.reload(msticpy)
else:
import msticpy
check_mp_ver(REQ_PYTHON_VER)
# If not using Azure Notebooks, install msticpy with
# !pip install msticpy
from msticpy.nbtools import nbinit
nbinit.init_notebook(
namespace=globals(),
extra_imports=["ipwhois, IPWhois"]
);
Processing imports.... Checking configuration.... Setting options....
<details>
Use the following syntax if you are authenticating using an Azure Active Directory AppId and Secret:
%kql loganalytics://tenant(aad_tenant).workspace(WORKSPACE_ID).clientid(client_id).clientsecret(client_secret)
instead of
%kql loganalytics://code().workspace(WORKSPACE_ID)
Note: you may occasionally see a JavaScript error displayed at the end of the authentication - you can safely ignore this.
On successful authentication you should see a popup schema
button.
To find your Workspace Id go to Log Analytics. Look at the workspace properties to find the ID.
</details>
#See if we have an Azure Sentinel Workspace defined in our config file, if not let the user specify Workspace and Tenant IDs
from msticpy.nbtools.wsconfig import WorkspaceConfig
# WorkspaceConfig.list_workspaces()
# ws_config = WorkspaceConfig(workspace="My_Workspace_Name")
# calling WorkspaceConfig with no parameters will load the default workspace from msticpyconfig.yaml
# or fall back on a config.json file.
ws_config = WorkspaceConfig()
try:
ws_id = ws_config['workspace_id']
ten_id = ws_config['tenant_id']
config = True
md("Workspace details collected from config file")
except KeyError:
md(('Please go to your Log Analytics workspace, copy the workspace ID'
' and/or tenant Id and paste here to enable connection to the workspace and querying of it..<br> '))
ws_id_wgt = nbwidgets.GetEnvironmentKey(env_var='WORKSPACE_ID',
prompt='Please enter your Log Analytics Workspace Id:', auto_display=True)
ten_id_wgt = nbwidgets.GetEnvironmentKey(env_var='TENANT_ID',
prompt='Please enter your Log Analytics Tenant Id:', auto_display=True)
config = False
Workspace details collected from config file
if config is False:
ws_id = ws_id_wgt.value
ten_id = ten_id_wgt.value
# Establish a query provider for Azure Sentinel and connect to it
qry_prov = QueryProvider('LogAnalytics')
qry_prov.connect(connection_str=ws_config.code_connect_str)
table_index = qry_prov.schema_tables
The notebook is expecting your Azure Sentinel Tenant ID and Workspace ID to be configured in one of the following places:
config.json
in the current foldermsticpyconfig.yaml
in the current folder or location specified by MSTICPYCONFIG
environment variable.For help with setting up your config.json
file (if this hasn't been done automatically) see the ConfiguringNotebookEnvironment
notebook in the root folder of your Azure-Sentinel-Notebooks project. This shows you how to obtain your Workspace and Subscription IDs from the Azure Sentinel Portal. You can use the SubscriptionID to find your Tenant ID). To view the current config.json
run the following in a code cell.
%pfile config.json
For help with setting up your msticpyconfig.yaml
see the Setup section at the end of this notebook and the ConfigureNotebookEnvironment notebook
host_text = widgets.Text(
description="Enter the Host name to search for:", **WIDGET_DEFAULTS
)
display(host_text)
query_times = nbwidgets.QueryTime(units="day", max_before=20, before=5, max_after=1)
query_times.display()
# Get single event - try process creation
if "SecurityEvent" not in table_index:
raise ValueError("No Windows event log data available in the workspace")
host_name = None
matching_hosts_df = qry_prov.WindowsSecurity.list_host_processes(
query_times, host_name=host_text.value.strip(), add_query_items="| distinct Computer"
)
if len(matching_hosts_df) > 1:
print(f"Multiple matches for '{host_text.value}'. Please select a host from the list.")
choose_host = nbwidgets.SelectString(
item_list=list(matching_hosts_df["Computer"].values),
description="Select the host.",
auto_display=True,
)
elif not matching_hosts_df.empty:
host_name = matching_hosts_df["Computer"].iloc[0]
print(f"Unique host found: {host_name}")
else:
print(f"Host not found: {host_text.value}")
if not host_name:
host_name = choose_host.value
host_entity = None
if not matching_hosts_df.empty:
host_entity = entities.Host(src_event=matching_hosts_df[matching_hosts_df["Computer"] == host_name].iloc[0])
if not host_entity:
raise LookupError(f"Could not find Windows events the name {host_name}")
def populate_heartbeat_details(host_hb_df, host_entity=None):
if not host_hb_df.empty:
host_hb = host_hb_df.iloc[0]
if not host_entity:
host_entity = entities.Host(host_hb["Computer"])
host_entity.SourceComputerId = host_hb["SourceComputerId"]
host_entity.OSType = host_hb["OSType"]
host_entity.OSMajorVersion = host_hb["OSMajorVersion"]
host_entity.OSMinorVersion = host_hb["OSMinorVersion"]
host_entity.ComputerEnvironment = host_hb["ComputerEnvironment"]
host_entity.ResourceId = host_hb["ResourceId"]
host_entity.OmsSolutions = [
sol.strip() for sol in host_hb["Solutions"].split(",")
]
host_entity.VMUUID = host_hb["VMUUID"]
ip_entity = entities.IpAddress()
ip_entity.Address = host_hb["ComputerIP"]
geoloc_entity = entities.GeoLocation()
geoloc_entity.CountryName = host_hb["RemoteIPCountry"]
geoloc_entity.Longitude = host_hb["RemoteIPLongitude"]
geoloc_entity.Latitude = host_hb["RemoteIPLatitude"]
ip_entity.Location = geoloc_entity
host_entity.IPAddress = ip_entity # TODO change to graph edge
return host_entity
def convert_to_ip_entities(ip_str):
iplocation = GeoLiteLookup()
ip_entities = []
if ip_str:
if "," in ip_str:
addrs = ip_str.split(",")
elif " " in ip_str:
addrs = ip_str.split(" ")
else:
addrs = [ip_str]
for addr in addrs:
ip_entity = entities.IpAddress()
ip_entity.Address = addr.strip()
iplocation.lookup_ip(ip_entity=ip_entity)
ip_entities.append(ip_entity)
return ip_entities
# Add this information to our inv_host_entity
def populate_host_aznet_ips(az_net_df, host_entity):
retrieved_address = []
if len(az_net_df) == 1:
host_entity.private_ips = convert_to_ip_entities(
az_net_df["PrivateIPAddresses"].iloc[0]
)
host_entity.public_ips = convert_to_ip_entities(
az_net_df["PublicIPAddresses"].iloc[0]
)
retrieved_address = [ip.Address for ip in host_entity.public_ips]
else:
if "private_ips" not in host_entity:
host_entity.private_ips = []
if "public_ips" not in host_entity:
host_entity.public_ips = []
iplocation = GeoLiteLookup()
# Try to get an OMS Heartbeat for this computer
if "Heartbeat" in table_index:
print(f"Looking for {host_name} in OMS Heartbeat data...")
host_hb_df = qry_prov.Network.get_heartbeat_for_host(host_name=host_name)
host_entity = populate_heartbeat_details(host_hb_df, host_entity)
if "AzureNetworkAnalytics_CL" in table_index:
print(f"Looking for {host_name} IP addresses in network flows...")
az_net_df = qry_prov.Network.get_ips_for_host(host_name=host_name)
populate_host_aznet_ips(az_net_df, host_entity)
md("Host Details", "bold")
print(host_entity)
ra_query_times = nbwidgets.QueryTime(
units="day",
origin_time=query_times.origin_time,
max_before=28,
max_after=5,
before=5,
auto_display=True,
)
related_alerts = qry_prov.SecurityAlert.list_related_alerts(
ra_query_times, host_name=host_entity.HostName
)
def print_related_alerts(alertDict, entityType, entityName):
if len(alertDict) > 0:
display(
Markdown(
f"### Found {len(alertDict)} different alert types related to this {entityType} (`{entityName}`)"
)
)
for (k, v) in alertDict.items():
print(f"- {k}, # Alerts: {v}")
else:
print(f"No alerts for {entityType} entity `{entityName}`")
if isinstance(related_alerts, pd.DataFrame) and not related_alerts.empty:
host_alert_items = (
related_alerts[["AlertName", "TimeGenerated"]]
.groupby("AlertName")
.TimeGenerated.agg("count")
.to_dict()
)
print_related_alerts(host_alert_items, "host", host_entity.HostName)
if len(host_alert_items) > 1:
nbdisplay.display_timeline(
data=related_alerts, title="Alerts", source_columns=["AlertName"], height=200
)
else:
display(Markdown("No related alerts found."))
Select an Alert to view details
def disp_full_alert(alert):
global related_alert
related_alert = SecurityAlert(alert)
nbdisplay.display_alert(related_alert, show_entities=True)
recenter_wgt = widgets.Checkbox(
value=True,
description='Center subsequent query times round selected Alert?',
disabled=False,
**WIDGET_DEFAULTS
)
if related_alerts is not None and not related_alerts.empty:
related_alerts["CompromisedEntity"] = related_alerts["Computer"]
display(Markdown("### Click on alert to view details."))
display(recenter_wgt)
rel_alert_select = nbwidgets.AlertSelector(
alerts=related_alerts,
action=disp_full_alert,
)
rel_alert_select.display()
# set the origin time to the time of our alert
origin_time = (related_alert.TimeGenerated
if recenter_wgt.value
else query_times.origin_time)
logon_query_times = nbwidgets.QueryTime(
units="day",
origin_time=origin_time,
before=5,
after=1,
max_before=20,
max_after=20,
)
logon_query_times.display()
host_logons = qry_prov.WindowsSecurity.list_host_logons(
logon_query_times, host_name=host_entity.HostName
)
if host_logons is not None and not host_logons.empty:
display(Markdown("### Logon timeline."))
tooltip_cols = [
"TargetUserName",
"TargetDomainName",
"SubjectUserName",
"SubjectDomainName",
"LogonType",
"IpAddress",
]
nbdisplay.display_timeline(
data=host_logons,
group_by="TargetUserName",
source_columns=tooltip_cols,
legend="right", yaxis=True
)
display(Markdown("### Counts of logon events by logon type."))
display(Markdown("Min counts for each logon type highlighted."))
logon_by_type = (
host_logons[["Account", "LogonType", "EventID"]]
.astype({'LogonType': 'int32'})
.merge(right=pd.Series(data=nbdisplay._WIN_LOGON_TYPE_MAP, name="LogonTypeDesc"),
left_on="LogonType", right_index=True)
.drop(columns="LogonType")
.groupby(["Account", "LogonTypeDesc"])
.count()
.unstack()
.rename(columns={"EventID": "LogonCount"})
.fillna(0)
.style
.background_gradient(cmap="viridis", low=0.5, high=0)
.format("{0:0>3.0f}")
)
display(logon_by_type)
else:
display(Markdown("No logon events found for host."))
failedLogons = qry_prov.WindowsSecurity.list_host_logon_failures(
logon_query_times, host_name=host_entity.HostName
)
if failedLogons.empty:
print("No logon failures recorded for this host between ",
f" {logon_query_times.start} and {logon_query_times.end}"
)
else:
nbdisplay.display_timeline(
data=host_logons.query('TargetLogonId != "0x3e7"'),
overlay_data=failedLogons,
alert=related_alert,
title="Logons (blue=user-success, green=failed)",
source_columns=tooltip_cols,
height=200,
)
display(failedLogons
.astype({'LogonType': 'int32'})
.merge(right=pd.Series(data=nbdisplay._WIN_LOGON_TYPE_MAP, name="LogonTypeDesc"),
left_on="LogonType", right_index=True)
[['Account', 'EventID', 'TimeGenerated',
'Computer', 'SubjectUserName', 'SubjectDomainName',
'TargetUserName', 'TargetDomainName',
'LogonTypeDesc','IpAddress', 'WorkstationName'
]])
This query joins failed and successful logons for the same account name. Multiple logon failures followed by a sucessful logon might indicate attempts to guess or probe the user password.
if not failedLogons.empty:
combined = pd.concat([failedLogons,
host_logons[host_logons["TargetUserName"]
.isin(failedLogons["TargetUserName"]
.drop_duplicates())]])
display(combined.head())
combined["LogonStatus"] = combined.apply(lambda x: "Failed" if x.EventID == 4625 else "Success", axis=1)
nbdisplay.display_timeline(data=combined,
group_by="LogonStatus",
source_columns=["TargetUserName", "LogonType", "SubjectUserName", "TargetLogonId"],
legend="inline",
yaxis=True,
height=200)
display(combined.sort_values("TimeGenerated"))
else:
md(f"No logon failures recorded for this host between {logon_query_times.start} and {logon_query_times.end}")
It's often useful to look at what other events were being logged at the time of the attack.
We show events here grouped by Account. Things to look for are:
md(f"Collecting Windows Event Logs for {host_entity.HostName}, this may take a few minutes...")
all_events_df = qry_prov.WindowsSecurity.list_host_events(
logon_query_times,
host_name=host_entity.HostName,
add_query_items="| where EventID != 4688 and EventID != 4624",
)
# Create a pivot of Event vs. Account
win_events_acc = all_events_df[["Account", "Activity", "TimeGenerated"]].copy()
win_events_acc = win_events_acc.replace("-\\-", "No Account").replace(
{"Account": ""}, value="No Account"
)
win_events_acc["Account"] = win_events_acc.apply(lambda x: x.Account.split("\\")[-1], axis=1)
event_pivot = (
pd.pivot_table(
win_events_acc,
values="TimeGenerated",
index=["Activity"],
columns=["Account"],
aggfunc="count",
)
.fillna(0)
.reset_index()
)
display(Markdown("Yellow highlights indicate account with highest event count"))
(
event_pivot.style
.applymap(lambda x: "color: white" if x == 0 else "")
.applymap(
lambda x: "background-color: lightblue"
if not isinstance(x, str) and x > 0
else ""
)
.set_properties(subset=["Activity"], **{"width": "400px", "text-align": "left"})
.highlight_max(axis=1)
.hide_index()
)
For events that you want to look at in more detail you can parse out the full EventData field (containing all fields of the original event). The parse_event_data
function below does that - transforming the EventData XML into a dictionary of property/value pairs). The expand_event_properties
function takes this dictionary and transforms into columns in the output DataFrame.
We recommend that you process batches of related event types (e.g. all user account change events) that have similar sets of fields to keep the output DataFrame manageable. </details>
# Function to convert EventData XML into dictionary and
# populate columns into DataFrame from previous query result
import xml.etree.ElementTree as ET
from xml.etree.ElementTree import ParseError
SCHEMA = "http://schemas.microsoft.com/win/2004/08/events/event"
def parse_event_data(row):
try:
xdoc = ET.fromstring(row.EventData)
col_dict = {
elem.attrib["Name"]: elem.text for elem in xdoc.findall(f"{{{SCHEMA}}}Data")
}
reassigned = set()
for k, v in col_dict.items():
if k in row and not row[k]:
row[k] = v
reassigned.add(k)
if reassigned:
# print('Reassigned: ', ', '.join(reassigned))
for k in reassigned:
col_dict.pop(k)
return col_dict
except (ParseError, TypeError):
return None
# Parse event properties into a dictionary
all_events_df["EventProperties"] = all_events_df.apply(parse_event_data, axis=1)
# For a specific event ID you can explode the EventProperties values into their own columns
# using this function. You can do this for the whole data set but it will result
# in a lot of sparse columns in the output data frame
def expand_event_properties(input_df):
exp_df = input_df.apply(lambda x: pd.Series(x.EventProperties), axis=1)
return (
exp_df.drop(set(input_df.columns).intersection(exp_df.columns), axis=1)
.merge(
input_df.drop("EventProperties", axis=1),
how="inner",
left_index=True,
right_index=True,
)
.replace("", np.nan) # these 3 lines get rid of blank columns
.dropna(axis=1, how="all")
.fillna("")
)
expand_event_properties(all_events_df[all_events_df["EventID"] == 4724]).head(2)
Here we want to focus on a some specific subcategories of events. Attackers commonly try to add or change user accounts and group memberships. We also include events related to addition or change of scheduled tasks and Windows services.
# Get a full list of Windows Security Events
import pkgutil
import os
w_evt = pkgutil.get_data("msticpy", f"resources{os.sep}WinSecurityEvent.json")
win_event_df = pd.read_json(w_evt)
# Create criteria for events that we're interested in
acct_sel = win_event_df["subcategory"] == "User Account Management"
group_sel = win_event_df["subcategory"] == "Security Group Management"
schtask_sel = (win_event_df["subcategory"] == "Other Object Access Events") & (
win_event_df["description"].str.contains("scheduled task")
)
event_list = win_event_df[acct_sel | group_sel | schtask_sel]["event_id"].to_list()
# Add Service install event
event_list.append(7045)
# Plot events on a timeline
p = nbdisplay.display_timeline(
data=all_events_df[all_events_df["EventID"].isin(event_list)],
group_by="EventID",
source_columns=["Activity", "Account"],
legend="right",
)
From the above data - pick which event types you want to view (by default, all are selected). The second cell will display the event types selected.
# populate actual events IDs to select from
recorded_events = (all_events_df['EventID']
[all_events_df["EventID"]
.isin(event_list)].drop_duplicates().values)
event_subset = win_event_df[win_event_df["event_id"].isin(event_list)
& win_event_df["event_id"].isin(recorded_events)]
items = list(event_subset.apply(lambda x: (x.full_desc, x.event_id), axis=1).values)
ss = nbwidgets.SelectSubset(
items,
default_selected=items
)
col_names = ['TimeGenerated', 'Account', 'AccountType',
'Computer', 'EventID', 'Activity', 'SubjectAccount',
'SubjectDomainName', 'SubjectLogonId', 'SubjectUserName',
'TargetAccount', 'TargetDomainName', 'TargetSid', 'TargetUserName']
display(all_events_df[all_events_df["EventID"].isin(ss.selected_values)]
[col_names]
.replace(to_replace="", value=np.NAN)
.dropna(axis=1, how="all"))
Looking at characteristics and activity of individual logon sessions is an effective way of spottting clusters of attacker activity.
The biggest problem is deciding which logon sessions are the ones to look at. We may already have some indicators of sessions that we want to examine from earlier sections:
In this section we use clustering to collapse repetive logons and show details of the distinct logon patterns
from msticpy.sectools.eventcluster import (
dbcluster_events,
add_process_features,
_string_score,
)
if host_logons is None or host_logons.empty:
display(Markdown("No host logons recorded. This section cannot be run."))
raise ValueError("aborted")
# Set up clustering features and run DBScan clustering
logon_features = host_logons.copy()
logon_features["AccountNum"] = host_logons.apply(
lambda x: _string_score(x.Account), axis=1
)
logon_features["TargetUserNum"] = host_logons.apply(
lambda x: _string_score(x.TargetUserName), axis=1
)
logon_features["LogonHour"] = host_logons.apply(lambda x: x.TimeGenerated.hour, axis=1)
# you might need to play around with the max_cluster_distance parameter.
# decreasing this gives more clusters.
(clus_logons, _, _) = dbcluster_events(
data=logon_features,
time_column="TimeGenerated",
cluster_columns=["AccountNum", "LogonType", "TargetUserNum"],
max_cluster_distance=0.0001,
)
display(Markdown(f"Number of input events: {len(host_logons)}"))
display(Markdown(f"Number of clustered events: {len(clus_logons)}"))
display(Markdown("### Relative frequencies by account pattern"))
plt.rcParams["figure.figsize"] = (12, 4)
clus_logons.sort_values("Account").plot.barh(x="Account", y="ClusterSize");
import re
# Build a list of distinct logon patterns from the clustered data
dist_logons = clus_logons.sort_values("TimeGenerated")[
["TargetUserName", "TimeGenerated", "LastEventTime", "LogonType", "ClusterSize"]
]
dist_logons = dist_logons.apply(
lambda x: (
f"{x.TargetUserName}: "
f"(logontype {x.LogonType}) "
f"timerange: {x.TimeGenerated} - {x.LastEventTime} "
f"count: {x.ClusterSize}"
),
axis=1,
)
# Convert to dict, flipping keys/values
dist_logons = {v: k for k, v in dist_logons.to_dict().items()}
def get_selected_logon_cluster(selected_item):
acct = clus_logons.loc[selected_item]["TargetUserName"]
logon_type = clus_logons.loc[selected_item]["LogonType"]
return host_logons.query("TargetUserName == @acct and LogonType == @logon_type")
# Create an Output widget to show the Logon Details
w_output = widgets.Output(layout={"border": "1px solid black"})
def show_logon(idx):
w_output.clear_output()
with w_output:
nbdisplay.display_logon_data(pd.DataFrame(clus_logons.loc[idx]).T)
logon_wgt = nbwidgets.SelectString(
description="Select logon cluster to examine",
item_dict=dist_logons,
action=show_logon,
height="200px",
width="100%",
auto_display=True,
)
display(w_output)
# Display the first item on first view
top_item = next(iter(dist_logons.values()))
with w_output:
nbdisplay.display_logon_data(pd.DataFrame(clus_logons.loc[top_item]).T)
In this section we look at the types of processes run in each logon session. For each process (and process characteristics such as command line structure) we measure its rarity compared to other processes on the same host. We then calculate the mean rarity of all processes in a logon session and display the results ordered by rarity. One is the highest possible score and would indicate all processes in the session have a unique execution pattern.
Note: The next section retrieves processes for time period around the logons for the user ID selected in the previous session. If you want to view a broader time boundary please adjust the query time boundaries in below.
# set the origin time to start at the first logon in our set
# end end 2hrs after the last
start_time = host_logons["TimeGenerated"].min()
end_time = host_logons["TimeGenerated"].max()
time_diff = int((end_time - start_time).total_seconds() / (60 * 60) + 2)
proc_query_times = nbwidgets.QueryTime(
units="hours",
origin_time=start_time,
before=1,
after=time_diff + 1,
max_before=20,
max_after=time_diff + 20,
)
proc_query_times.display()
This should be a good guide to which sessions are the more interesting to look at.
Note Clustering lots (1000s) of events will take a little time to compute.
from msticpy.sectools.eventcluster import dbcluster_events, add_process_features
from collections import Counter
print("Getting process events...", end="")
processes_on_host = qry_prov.WindowsSecurity.list_host_processes(
proc_query_times, host_name=host_entity.HostName
)
print(f"done. {len(processes_on_host)} events")
print("Clustering. Please wait...", end="")
feature_procs = add_process_features(input_frame=processes_on_host, path_separator="\\")
feature_procs["accountNum"] = feature_procs.apply(
lambda x: _string_score(x.Account), axis=1
)
# you might need to play around with the max_cluster_distance parameter.
# decreasing this gives more clusters.
(clus_events, dbcluster, x_data) = dbcluster_events(
data=feature_procs,
cluster_columns=[
"commandlineTokensFull",
"pathScore",
"accountNum",
"isSystemSession",
],
max_cluster_distance=0.0001,
)
print("done")
print("Number of input events:", len(feature_procs))
print("Number of clustered events:", len(clus_events))
# Join the clustered results back to the original process frame
procs_with_cluster = feature_procs.merge(
clus_events[
[
"commandlineTokensFull",
"accountNum",
"pathScore",
"isSystemSession",
"ClusterSize",
]
],
on=["commandlineTokensFull", "accountNum", "pathScore", "isSystemSession"],
)
# Rarity = inverse of cluster size
procs_with_cluster["Rarity"] = 1 / procs_with_cluster["ClusterSize"]
# count the number of processes for each logon ID
lgn_proc_count = (
pd.concat(
[
processes_on_host.groupby("TargetLogonId")["TargetLogonId"].count(),
processes_on_host.groupby("SubjectLogonId")["SubjectLogonId"].count(),
]
).sum(level=0)
).to_dict()
# Display the results
md("Sessions ordered by process rarity", 'bold')
md("Higher score indicates higher number of unusual processes")
process_rarity = (procs_with_cluster.groupby(["SubjectUserName", "SubjectLogonId"])
.agg({"Rarity": "mean", "TimeGenerated": "count"})
.rename(columns={"TimeGenerated": "ProcessCount"})
.reset_index())
display(
process_rarity
.sort_values("Rarity", ascending=False)
.style.bar(subset=["Rarity"], color="#d65f5f")
)
# Display process timeline for 75% percentile rarest scores
rare_sess = process_rarity[process_rarity["Rarity"]
> process_rarity["Rarity"].quantile(.25)]
rare_sessions = processes_on_host[(processes_on_host["SubjectLogonId"]
.isin(rare_sess["SubjectLogonId"]))
& (processes_on_host["SubjectUserName"]
.isin(rare_sess["SubjectUserName"]))]
md("Timeline of sessions with higher process rarity", "large")
md("Multiple sessions (y-axis) may be are shown for each account.")
md("You will likely need to zoom in to see the individual session processes.")
nbdisplay.display_timeline(
data=rare_sessions,
group_by="SubjectLogonId",
source_columns=["SubjectUserName", "SubjectLogonId", "NewProcessName", "CommandLine"],
legend="right",
yaxis=True
);
def view_logon_sess(logon_id=""):
global selected_logon
selected_logon = host_logons[host_logons["TargetLogonId"] == logon_id]
if all_procs.value:
sess_procs = processes_on_host.query(
"TargetLogonId == @logon_id | SubjectLogonId == @logon_id"
)
else:
sess_procs = procs_with_cluster.query("SubjectLogonId == @logon_id")[
["NewProcessName", "CommandLine", "SubjectLogonId", "ClusterSize"]
].drop_duplicates()
display(sess_procs)
sessions = list(process_rarity
.sort_values("Rarity", ascending=False)
.apply(lambda x: (f"{x.SubjectLogonId} {x.SubjectUserName} Rarity={x.Rarity}",
x.SubjectLogonId),
axis=1))
all_procs = widgets.Checkbox(
value=False,
description="View All Processes (Show clustered only if not checked)",
**WIDGET_DEFAULTS,
)
display(all_procs)
logon_wgt = nbwidgets.SelectString(
description="Select logon session to examine",
item_dict={label: val for label, val in sessions},
height="300px",
width="100%",
auto_display=True,
action=view_logon_sess,
)
If the previous section did not reveal anything interesting you can opt to browse all logon sessions.
Otherwise, skip to the Check Commandline for IoCs section
To do this you need to first pick an account + logon type (in the following cell) then pick a particular session that you want to view in the subsequent cell. Use the rarity score from the previous graph to guide you.
logon_wgt2 = nbwidgets.SelectString(
description="Select logon cluster to examine",
item_dict=dist_logons,
height="200px",
width="100%",
auto_display=True,
)
all_procs = widgets.Checkbox(
value=False,
description="View All Processes (Clustered only if not checked)",
**WIDGET_DEFAULTS,
)
display(all_procs)
selected_logon_cluster = get_selected_logon_cluster(logon_wgt2.value)
selected_tgt_logon = selected_logon_cluster["TargetUserName"].iat[0]
system_logon = selected_tgt_logon.lower() == "system" or selected_tgt_logon.endswith(
"$"
)
if system_logon:
display(
Markdown(
'<h3><p style="color:red">Warning: the selected '
"account name appears to be a system account.</p></h1><br>"
"<i>It is difficult to accurately associate processes "
"with the specific logon sessions.<br>"
"Showing clustered events for entire time selection."
)
)
display(
clus_events.sort_values("TimeGenerated")[
[
"TimeGenerated",
"LastEventTime",
"NewProcessName",
"CommandLine",
"ClusterSize",
"commandlineTokensFull",
"pathScore",
"isSystemSession",
]
]
)
# Display a pick list for logon instances
sel_1 = host_logons["TargetLogonId"].isin(lgn_proc_count)
sel_2 = host_logons["TargetUserName"] == selected_tgt_logon
items = (
host_logons[sel_1 & sel_2]
.sort_values("TimeGenerated")
.apply(
lambda x: (
f"{x.TargetUserName}: "
f"(logontype={x.LogonType}) "
f"(timestamp={x.TimeGenerated}) "
f"logonid={x.TargetLogonId}"
),
axis=1,
)
.values.tolist()
)
if not items:
items = ["No processes for logon"]
sess_w = widgets.Select(
options=items, description="Select logon instance to examine", **WIDGET_DEFAULTS
)
import re
logon_list_regex = r"""
(?P<acct>[^:]+):\s+
\(logontype=(?P<logon_type>[^)]+)\)\s+
\(timestamp=(?P<time>[^)]+)\)\s+
logonid=(?P<logonid>[0-9a-fx)]+)
"""
def get_selected_logon(selected_item):
acct_match = re.search(logon_list_regex, selected_item, re.VERBOSE)
if acct_match:
acct = acct_match["acct"]
logon_type = int(acct_match["logon_type"])
time_stamp = pd.to_datetime(acct_match["time"])
logon_id = acct_match["logonid"]
return host_logons.query(
"TargetUserName == @acct and LogonType == @logon_type"
" and TargetLogonId == @logon_id"
)
def view_logon_sess(x=""):
global selected_logon
selected_logon = get_selected_logon(x)
if selected_logon is not None:
logonId = selected_logon["TargetLogonId"].iloc[0]
if all_procs.value:
sess_procs = processes_on_host.query(
"TargetLogonId == @logonId | SubjectLogonId == @logonId"
)
else:
sess_procs = procs_with_cluster.query("SubjectLogonId == @logonId")[
["NewProcessName", "CommandLine", "SubjectLogonId", "ClusterSize"]
].drop_duplicates()
display(sess_procs)
widgets.interactive(view_logon_sess, x=sess_w)
This section looks for Indicators of Compromise (IoC) within the data sets passed to it.
The input data for this comes from the session you picked in the following sections:
To change which section is used - go back to the desired section and pick a session or re-run the entire cell.
# Use this to search all process commandlines
# ioc_df = ioc_extractor.extract(
# data=procs_with_cluster,
# columns=["CommandLine"],
# os_family=os_family,
# ioc_types=["ipv4", "ipv6", "dns", "url", "md5_hash", "sha1_hash", "sha256_hash"],
# )
# ioc_df[["IoCType", "Observable"]].drop_duplicates()
selected_tgt_logon = selected_logon["TargetUserName"].iat[0]
system_logon = (selected_tgt_logon.lower() == "system"
or selected_tgt_logon.endswith("$"))
if not system_logon:
logonId = selected_logon["TargetLogonId"].iloc[0]
sess_procs = processes_on_host.query(
"TargetLogonId == @logonId | SubjectLogonId == @logonId"
)
else:
sess_procs = clus_events
ioc_extractor = IoCExtract()
os_family = host_entity.OSType if host_entity.OSType else "Windows"
ioc_df = ioc_extractor.extract(
data=sess_procs,
columns=["CommandLine"],
os_family=os_family,
ioc_types=["ipv4", "ipv6", "dns", "url", "md5_hash", "sha1_hash", "sha256_hash"],
)
if len(ioc_df):
display(Markdown("### IoC patterns found in process set."))
display(ioc_df[["IoCType", "Observable"]].drop_duplicates())
else:
display(Markdown("### No IoC patterns found in process tree."))
<details>
For simple strings the Base64 decoded output is straightforward. However for nested encodings this can get a little complex and difficult to represent in a tabular format.
Columns
</details>
dec_df = base64.unpack_items(data=sess_procs, column="CommandLine")
if dec_df is not None and len(dec_df) > 0:
display(HTML("<h3>Decoded base 64 command lines</h3>"))
display(HTML("Decoded values and hashes of decoded values shown below."))
display(
HTML(
"Warning - some binary patterns may be decodable as unicode strings. "
'In these cases you should ignore the "decoded_string" column '
'and treat the encoded item as a binary - using the "printable_bytes" '
"column or treat the decoded_string as a binary (bytes) value."
)
)
display(
dec_df[
[
"full_decoded_string",
"decoded_string",
"original_string",
"printable_bytes",
"file_hashes",
]
]
)
ioc_dec_df = ioc_extractor.extract(data=dec_df, columns=["full_decoded_string"])
if len(ioc_dec_df):
display(HTML("<h3>IoC patterns found in events with base64 decoded data</h3>"))
display(ioc_dec_df)
ioc_df = ioc_df.append(ioc_dec_df, ignore_index=True)
else:
print("No base64 encodings found.")
This section takes the output from the IoC Extraction from commandlines and submits it to Threat Intelligence services to see if any of the IoC are known threats.
Please take a moment to review the Selected
list below and remove any items that are obviously not items that you want to lookup (e.g. myadmintool.ps
is almost certainly a PowerShell script but is also a valid match for a legal DNS domain).
Then reload provider settings:
mylookup = TILookup()
mylookup.reload_provider_settings()
ti_lookup = TILookup()
ti_lookup.provider_status
items = list(ioc_df["Observable"].values)
ioc_ss = nbwidgets.SelectSubset(
items,
default_selected=items
)
iocs_to_check = (ioc_df[ioc_df["Observable"].isin(ioc_ss.selected_items)]
[["IoCType", "Observable"]].drop_duplicates())
ti_lookup.lookup_iocs(data=iocs_to_check, obs_col="Observable", ioc_type_col="IoCType")
ip_q_times = nbwidgets.QueryTime(
label="Set time bounds for network queries",
units="day",
max_before=28,
before=2,
after=5,
max_after=28,
origin_time=proc_query_times.origin_time
)
ip_q_times.display()
if "AzureNetworkAnalytics_CL" not in table_index:
md_warn("AzureNetworkAnalytics_CL table is not available.")
md("No network flow data available. "
+ "Please skip the remainder of this section.",
"blue, bold")
az_net_comms_df = None
else:
all_host_ips = (
host_entity.private_ips + host_entity.public_ips + [host_entity.IPAddress]
)
host_ips = [i.Address for i in all_host_ips]
az_net_comms_df = qry_prov.Network.list_azure_network_flows_by_ip(
ip_q_times, ip_address_list=host_ips
)
if isinstance(az_net_comms_df, pd.DataFrame) and not az_net_comms_df.empty:
az_net_comms_df['TotalAllowedFlows'] = az_net_comms_df['AllowedOutFlows'] + az_net_comms_df['AllowedInFlows']
nbdisplay.display_timeline(
data=az_net_comms_df,
group_by="L7Protocol",
title="Network Flows by Protocol",
time_column="FlowStartTime",
source_columns=["FlowType", "AllExtIPs", "L7Protocol", "FlowDirection"],
height=300,
legend="right",
yaxis=True
)
nbdisplay.display_timeline(
data=az_net_comms_df,
group_by="FlowDirection",
title="Network Flows by Direction",
time_column="FlowStartTime",
source_columns=["FlowType", "AllExtIPs", "L7Protocol", "FlowDirection"],
height=300,
legend="right",
yaxis=True
)
else:
print("No network data for specified time range.")
if az_net_comms_df is not None and not az_net_comms_df.empty:
flow_plot = nbdisplay.display_timeline_values(
data=az_net_comms_df,
group_by="L7Protocol",
source_columns=["FlowType",
"AllExtIPs",
"L7Protocol",
"FlowDirection",
"TotalAllowedFlows"],
time_column="FlowStartTime",
title="Network flows by Layer 7 Protocol",
y="TotalAllowedFlows",
ref_event=related_alert,
legend="right",
height=500,
kind=["vbar", "circle"]
);
else:
md("No network data for specified time range.")
if az_net_comms_df is not None and not az_net_comms_df.empty:
cm = sns.light_palette("green", as_cmap=True)
cols = [
"VMName",
"VMIPAddress",
"PublicIPs",
"SrcIP",
"DestIP",
"L4Protocol",
"L7Protocol",
"DestPort",
"FlowDirection",
"AllExtIPs",
"TotalAllowedFlows",
]
flow_index = az_net_comms_df[cols].copy()
def get_source_ip(row):
if row.FlowDirection == "O":
return row.VMIPAddress if row.VMIPAddress else row.SrcIP
else:
return row.AllExtIPs if row.AllExtIPs else row.DestIP
def get_dest_ip(row):
if row.FlowDirection == "O":
return row.AllExtIPs if row.AllExtIPs else row.DestIP
else:
return row.VMIPAddress if row.VMIPAddress else row.SrcIP
flow_index["source"] = flow_index.apply(get_source_ip, axis=1)
flow_index["dest"] = flow_index.apply(get_dest_ip, axis=1)
md(f"{len(flow_index)} events in flow index")
# Uncomment to view flow_index results
# with warnings.catch_warnings():
# warnings.simplefilter("ignore")
# display(
# flow_index[
# ["source", "dest", "L7Protocol", "FlowDirection", "TotalAllowedFlows"]
# ]
# .groupby(["source", "dest", "L7Protocol", "FlowDirection"])
# .sum()
# .reset_index()
# .style.bar(subset=["TotalAllowedFlows"], color="#d65f5f")
# )
else:
md("No network data for specified time range.")
# WHOIS lookup function
from functools import lru_cache
from ipwhois import IPWhois
from ipaddress import ip_address
@lru_cache(maxsize=1024)
def get_whois_info(ip_lookup, show_progress=False):
try:
ip = ip_address(ip_lookup)
except ValueError:
return "Not an IP Address", {}
if ip.is_private:
return "private address", {}
if not ip.is_global:
return "other address", {}
whois = IPWhois(ip)
whois_result = whois.lookup_whois()
if show_progress:
print(".", end="")
return whois_result["asn_description"], whois_result
# Add ASN informatio from Whois
flows_df = (
flow_index[["source", "dest", "L7Protocol", "FlowDirection", "TotalAllowedFlows"]]
.groupby(["source", "dest", "L7Protocol", "FlowDirection"])
.sum()
.reset_index()
)
num_ips = len(flows_df["source"].unique()) + len(flows_df["dest"].unique())
print(f"Performing WhoIs lookups for {num_ips} IPs ", end="")
#flows_df = flows_df.assign(DestASN="", DestASNFull="", SourceASN="", SourceASNFull="")
flows_df["DestASN"] = flows_df.apply(lambda x: get_whois_info(x.dest, True), axis=1)
flows_df["SourceASN"] = flows_df.apply(lambda x: get_whois_info(x.source, True), axis=1)
print("done")
# Split the tuple returned by get_whois_info into separate columns
flows_df["DestASNFull"] = flows_df.apply(lambda x: x.DestASN[1], axis=1)
flows_df["DestASN"] = flows_df.apply(lambda x: x.DestASN[0], axis=1)
flows_df["SourceASNFull"] = flows_df.apply(lambda x: x.SourceASN[1], axis=1)
flows_df["SourceASN"] = flows_df.apply(lambda x: x.SourceASN[0], axis=1)
our_host_asns = [get_whois_info(ip.Address)[0] for ip in host_entity.public_ips]
md(f"Host {host_entity.HostName} ASNs:", "bold")
md(str(our_host_asns))
flow_sum_df = flows_df.groupby(["DestASN", "SourceASN"]).agg(
TotalAllowedFlows=pd.NamedAgg(column="TotalAllowedFlows", aggfunc="sum"),
L7Protocols=pd.NamedAgg(column="L7Protocol", aggfunc=lambda x: x.unique().tolist()),
source_ips=pd.NamedAgg(column="source", aggfunc=lambda x: x.unique().tolist()),
dest_ips=pd.NamedAgg(column="dest", aggfunc=lambda x: x.unique().tolist()),
).reset_index()
flow_sum_df
Choose from the list of Selected ASNs for the IPs you wish to check on. The Source list is been pre-populated with all ASNs found in the network flow summary.
As an example, we've populated the Selected
list with the ASNs that have the lowest number of flows to and from the host. We also remove the ASN that matches the ASN of the host we are investigating.
Please edit this list, using flow summary data above as a guide and leaving only ASNs that you are suspicious about. Typicially these would be ones with relatively low TotalAllowedFlows
and possibly with unusual L7Protocols
.
Then reload provider settings:
mylookup = TILookup()
mylookup.reload_provider_settings()
all_asns = list(flow_sum_df["DestASN"].unique()) + list(flow_sum_df["SourceASN"].unique())
all_asns = set(all_asns) - set(["private address"])
# Select the ASNs in the 25th percentile (lowest number of flows)
quant_25pc = flow_sum_df["TotalAllowedFlows"].quantile(q=[0.25]).iat[0]
quant_25pc_df = flow_sum_df[flow_sum_df["TotalAllowedFlows"] <= quant_25pc]
other_asns = list(quant_25pc_df["DestASN"].unique()) + list(quant_25pc_df["SourceASN"].unique())
other_asns = set(other_asns) - set(our_host_asns)
md("Choose IPs from Selected ASNs to look up for Threat Intel.", "bold")
sel_asn = nbwidgets.SelectSubset(source_items=all_asns, default_selected=other_asns)
from itertools import chain
from msticpy.sectools.tiproviders.ti_provider_base import TISeverity
def ti_check_ser_sev(severity, threshold):
threshold = TISeverity.parse(threshold)
return severity.apply(lambda x: TISeverity.parse(x) >= threshold)
dest_ips = set(chain.from_iterable(flow_sum_df[flow_sum_df["DestASN"].isin(sel_asn.selected_items)]["dest_ips"]))
src_ips = set(chain.from_iterable(flow_sum_df[flow_sum_df["SourceASN"].isin(sel_asn.selected_items)]["source_ips"]))
selected_ips = dest_ips | src_ips
md(f"{len(selected_ips)} unique IPs in selected ASNs")
# Add the IoCType to save cost of inferring each item
md("Looking up TI...")
selected_ip_dict = {ip: "ipv4" for ip in selected_ips}
ti_results = ti_lookup.lookup_iocs(data=selected_ip_dict)
md(f"{len(ti_results)} results received.")
ti_results_pos = ti_results[ti_check_ser_sev(ti_results["Severity"], 1)]
print(f"{len(ti_results_pos)} positive results found.")
if not ti_results_pos.empty:
src_pos = flows_df.merge(ti_results_pos, left_on="source", right_on="Ioc")
dest_pos = flows_df.merge(ti_results_pos, left_on="dest", right_on="Ioc")
ti_ip_results = pd.concat([src_pos, dest_pos])
md_warn("Positive Threat Intel Results found for the following flows")
md("Please examine these IP flows using the IP Explorer notebook.", "bold, large")
display(ti_ip_results)
def format_ip_entity(row, ip_col):
ip_entity = entities.IpAddress(Address=row[ip_col])
iplocation.lookup_ip(ip_entity=ip_entity)
ip_entity.AdditionalData["protocol"] = row.L7Protocol
if "severity" in row:
ip_entity.AdditionalData["threat severity"] = row["severity"]
if "Details" in row:
ip_entity.AdditionalData["threat details"] = row["Details"]
return ip_entity
# from msticpy.nbtools.foliummap import FoliumMap
folium_map = FoliumMap(zoom_start=4)
if az_net_comms_df is None or az_net_comms_df.empty:
print("No network flow data available.")
else:
# Get the flow records for all flows not in the TI results
selected_out = flows_df[flows_df["DestASN"].isin(sel_asn.selected_items)]
selected_out = selected_out[~selected_out["dest"].isin(ti_ip_results["Ioc"])]
if selected_out.empty:
ips_out = []
else:
ips_out = list(selected_out.apply(lambda x: format_ip_entity(x, "dest"), axis=1))
selected_in = flows_df[flows_df["SourceASN"].isin(sel_asn.selected_items)]
selected_in = selected_in[~selected_in["source"].isin(ti_ip_results["Ioc"])]
if selected_in.empty:
ips_in = []
else:
ips_in = list(selected_in.apply(lambda x: format_ip_entity(x, "source"), axis=1))
ips_threats = list(ti_ip_results.apply(lambda x: format_ip_entity(x, "Ioc"), axis=1))
display(HTML("<h3>External IP Addresses communicating with host</h3>"))
display(HTML("Numbered circles indicate multiple items - click to expand"))
display(HTML("Location markers: <br>Blue = outbound, Purple = inbound, Green = Host, Red = Threats"))
icon_props = {"color": "green"}
for ips in host_entity.public_ips:
ips.AdditionalData["host"] = host_entity.HostName
folium_map.add_ip_cluster(ip_entities=host_entity.public_ips, **icon_props)
icon_props = {"color": "blue"}
folium_map.add_ip_cluster(ip_entities=ips_out, **icon_props)
icon_props = {"color": "purple"}
folium_map.add_ip_cluster(ip_entities=ips_in, **icon_props)
icon_props = {"color": "red"}
folium_map.add_ip_cluster(ip_entities=ips_threats, **icon_props)
folium_map.center_map()
display(folium_map)
print("List of current DataFrames in Notebook")
print("-" * 50)
current_vars = list(locals().keys())
for var_name in current_vars:
if isinstance(locals()[var_name], pd.DataFrame) and not var_name.startswith("_"):
print(var_name)
To save the contents of a pandas DataFrame to an Excel spreadsheet use the following syntax
writer = pd.ExcelWriter('myWorksheet.xlsx')
my_data_frame.to_excel(writer,'Sheet1')
writer.save()
msticpyconfig.yaml
configuration File¶You can configure primary and secondary TI providers and any required parameters in the msticpyconfig.yaml
file. This is read from the current directory or you can set an environment variable (MSTICPYCONFIG
) pointing to its location.
To configure this file see the ConfigureNotebookEnvironment notebook