Notebook Version: 1.1
Python Version: Python 3.6 (including Python 3.6 - AzureML)
Data Sources Required: SecurityAlerts
This Notebook assists analysts in triage Alerts within Microsoft Sentinel by enriching them with Threat Intelligence and OSINT data. This purpose it to allow analysts to quickly triage a large number of alerts and identify those to focus investigation on.
How to use:
Run the cells in this Notebook in order, at various points in the Notebook flow you will be prompted to enter or select options relevant to the scope of your triage.
This Notebook presumes you have Microsoft Sentinel Workspace settings and Threat Intelligence providers configured in a config file. If you do not have this in place please refer https://msticpy.readthedocs.io/en/latest/getting_started/msticpyconfig.html# to https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb and to set this up.
The next cell:
This should complete without errors. If you encounter errors or warnings look at the following two notebooks:
If you are running in the Microsoft Sentinel Notebooks environment (Azure Notebooks or Azure ML) you can run live versions of these notebooks:
You may also need to do some additional configuration to successfully use functions such as Threat Intelligence service lookup and Geo IP lookup.
There are more details about this in the ConfiguringNotebookEnvironment
notebook and in these documents:
from pathlib import Path
from IPython.display import display, HTML
REQ_PYTHON_VER=(3, 6)
REQ_MSTICPY_VER=(1, 0, 0)
display(HTML("<h3>Starting Notebook setup...</h3>"))
# If not using Azure Notebooks, install msticpy with
# %pip install msticpy
from msticpy.nbtools import nbinit
extra_imports = [
"whois",
"datetime,,dt",
"msticpy.nbtools.foliummap, get_center_ip_entities",
"msticpy.nbtools.observationlist, Observations",
"msticpy.nbtools.observationlist, Observation",
"msticpy.sectools.ip_utils, convert_to_ip_entities"
]
nbinit.init_notebook(
namespace=globals(),
additional_packages=["IPWhois", "tldextract", "python-whois"],
extra_imports=extra_imports,
)
pd.options.mode.chained_assignment = None
Initialize TI and Observation list
# Initialize observations and TI modules
summary = Observations()
ti = TILookup()
print('Observation summary and TILookup loaded.')
This cell collects Workspace details contained in your msticpyconfig.yaml file and uses them to authenticate.
# See if we have a Microsoft Sentinel Workspace defined in our config file.
# If not, let the user specify Workspace and Tenant IDs
ws_config = WorkspaceConfig()
if not ws_config.config_loaded:
ws_config.prompt_for_ws()
qry_prov = QueryProvider(data_environment="AzureSentinel")
print("done")
# Authenticate to Microsoft Sentinel workspace
qry_prov.connect(ws_config)
Enter some information relevant to your triage work , this will then be stored as part of this Notebook for future reference and recall. Please also select which Threat Intelligence providers to use for enrichment. Please note you need to have auth details for each provider in order for this to operate. You can select one or more providers or select "All" to use all avaliable providers.
#Collect details for triage record
md("Enter Name:")
name = widgets.Text()
display(name)
md("Enter Ticket ID: ")
ticket = widgets.Text()
display(ticket)
md("Enter Description:")
description = widgets.Textarea()
display(description)
# Get list of configured TI providers and filter out non TI enrichments
ti_provs = [x for x in ti.configured_providers if not x == "OPR" and not x == "Tor"]
if not ti_provs:
raise Exception("""You do not have any Threat Intelligence providers configured.
Please refer to https://msticpy.readthedocs.io/en/latest/data_acquisition/TIProviders.html on how to configure them.""")
md("Select TI providers to use for enrichment.")
sel_ti_provs = widgets.SelectMultiple(
options=ti_provs + ["All"],
value=['All'],
description='TI providers:',
disabled=False
)
display(sel_ti_provs)
Adjust the time slider to select the timeframe for which you wish to triage alerts for.
# Set list of TI providers to use and record this in our summary record alongside triage details entered above.
if "All" in sel_ti_provs.value:
ti_prov_use = ti_provs
else:
ti_prov_use = list(sel_ti_provs.value)
invest_summary = Observation(caption="Investigation Details", data=
{"Analyst" : name.value, "Ticket": ticket.value, "Investigation Description" :description.value,
"Investigation Date": dt.datetime.now(), "TI Providers": ti_prov_use})
summary.add_observation(invest_summary)
# Widget to select time window in
query_times = nbwidgets.QueryTime(units='day',
max_before=30, max_after=1, before=3)
query_times.display()
You can choose to select a subset of alerts based on provider in order to narrow your triage scope. You can also select "All" to return security alerts from all providers. Once a provider is selected you can additionally filter by Alert Name in order to focus on a specific alert type.
# Collect alerts based on the scope set above
alerts = qry_prov.SecurityAlert.list_alerts(query_times)
alerts_summ = Observation(caption="Alerts", data={"Data" : alerts, "Times": query_times })
summary.add_observation(alerts_summ)
# display summary of alerts retrieved
md("Alert summary", "large")
display(alerts.groupby("ProviderName")[["AlertName"]]
.count()
.reset_index()
.rename(columns={"AlertName": "Alerts"})
)
def update_alert_names(_):
selected_alert_type = sel_alerts.value
if sel_prov.value != "All":
alert_names = alerts[alerts['ProviderName']==sel_prov.value]['AlertName'].unique()
else:
alert_names = alerts['AlertName'].unique()
alert_names = np.append(alert_names, ["All"])
sel_alerts.options = alert_names
if selected_alert_type in alert_names:
sel_alerts.value = selected_alert_type
else:
sel_alerts.value = "All"
if alerts.empty:
md(f"No alerts in this Workspace during between {query_times.start} and {query_times.end}", "bold")
else:
w_layout = list_layout = widgets.Layout(width="400px")
#Select Provider to filter by
providers = alerts['ProviderName'].unique()
providers = np.append(providers, ["All"])
sel_prov = widgets.Dropdown(
options=providers,
description='Providers:',
disabled=False,
layout=w_layout,
)
sel_prov.observe(update_alert_names, names="value")
alert_names = alerts[alerts['ProviderName']==sel_prov.value]['AlertName'].unique()
alert_names = np.append(alert_names, ["All"])
sel_alerts = widgets.Dropdown(
options=alert_names,
description='Alert Names:',
disabled=False,
value = "All",
layout=w_layout,
)
md("Select provider and/or Alert type to triage", "large")
display(widgets.VBox([sel_prov, sel_alerts]))
Once alerts are collected we can enrich these alerts by looking up the entities associated with these alerts in Threat Intelligence. The TI Risk column in the table below represents an aggregation of results from the selected TI providers.
import json
from tqdm.notebook import tqdm
# Filter alerts based on AlertName and Provider
if sel_prov.value == "All":
sent_alerts = alerts
else:
sent_alerts = alerts[alerts['ProviderName'] == sel_prov.value]
if sel_alerts.value == "All":
selected_alert_type = sent_alerts
else:
selected_alert_type = sent_alerts[sent_alerts['AlertName']== sel_alerts.value]
def entity_load(entity):
try:
return json.loads(entity)
except json.JSONDecodeError:
return None
selected_alert_type['Entities'] = selected_alert_type['Entities'].apply(entity_load)
# Lookup each entity in TI and aggregate results into a overall severity based on the highest indicator severity.
def lookup(row):
sev = []
if row['Entities'] is not None:
for entity in row['Entities']:
try:
if entity["Type"] == 'ip' or entity["Type"] == 'ipaddress':
resp = ti.lookup_ioc(observable=entity["Address"], providers=ti_prov_use)
elif entity["Type"] == 'url':
resp = ti.lookup_ioc(observable=entity["Url"], providers=ti_prov_use)
else:
resp = None
if resp:
for response in resp[1]:
sev.append(response[1].severity)
except KeyError:
pass
if 'high' in sev:
severity = "High"
elif 'warning' in sev:
severity = "Warning"
elif 'information' in sev:
severity = "Information"
else:
severity = "None"
return severity
# Highlight cells based on Threat Intelligence results.
def color_cells(val):
if isinstance(val, str):
if val.lower() == "high":
color = 'Red'
elif val.lower() == 'warning':
color = 'Orange'
elif val.lower() == 'information':
color = 'Green'
else:
color = 'none'
else:
color = 'none'
return 'background-color: %s' % color
tqdm.pandas(desc="Lookup progress")
selected_alert_type['TI Risk'] = selected_alert_type.progress_apply(lookup, axis=1)
display(selected_alert_type[['StartTimeUtc','AlertName','Severity','TI Risk', 'Description']]
.sort_values(by=['StartTimeUtc']).style.applymap(color_cells).hide_index())
We can drill down into a specific alert by selecting it from the list below. This will return additional details on the alert as well as details of any threat intelligence matches.
from msticpy.sectools.ip_utils import convert_to_ip_entities
from msticpy.nbtools.foliummap import FoliumMap, get_center_ip_entities
#Display full alert details when selected
def show_full_alert(selected_alert):
global security_alert, alert_ip_entities
output = []
security_alert = SecurityAlert(
rel_alert_select.selected_alert)
output.append(nbdisplay.format_alert(security_alert, show_entities=True))
ioc_list = []
if security_alert['Entities'] is not None:
for entity in security_alert['Entities']:
if entity['Type'] == 'ipaddress' or entity['Type'] == 'ip':
ioc_list.append(entity['Address'])
elif entity["Type"] == 'url':
ioc_list.append(entity['Url'])
if len(ioc_list) > 0:
ti_data = ti.lookup_iocs(data=ioc_list, providers=ti_prov_use)
output.append(ti_data[['Ioc','IocType','Provider','Result','Severity','Details']].reset_index().style.applymap(color_cells).hide_index())
ti_ips = ti_data[ti_data['IocType'] == 'ipv4']
# If we have IP entities try and plot these on a map
if not ti_ips.empty:
ip_ents = [convert_to_ip_entities(i)[0] for i in ti_ips['Ioc'].unique()]
center = get_center_ip_entities(ip_ents)
ip_map = FoliumMap(location=center, zoom_start=4)
ip_map.add_ip_cluster(ip_ents, color='red')
output.append(ip_map)
else:
output.append("")
else:
output.append("No IoCs")
else:
output.append("No Entities with IoCs")
return output
# Show selected alert when selected
if isinstance(alerts, pd.DataFrame) and not alerts.empty:
ti_data = None
md('Click on alert to view details.', "large")
rel_alert_select = nbwidgets.SelectAlert(alerts=selected_alert_type,
action=show_full_alert)
rel_alert_select.display()
# Add alert details to summary.
if ti_data is not None:
alert_details = Observation(caption="Alert Details", data={"Alert":security_alert, "TI":ti_data})
else:
alert_details = Observation(caption="Alert Details", data=security_alert)
summary.add_observation(alert_details)
else:
md('No alerts found.')
The cell below displays a timeline of the alerts you are triaging, with the selected alert highlighted in order to provide context on the alert.
# Display timeline of all alerts grouped by the TI risk score of them
selected_alert = Observation(caption="Alert Details", data=rel_alert_select.selected_alert)
summary.add_observation(selected_alert)
if len(selected_alert_type) == 1:
md("Only one alert in selected alert provider/type - can't display timeline.")
else:
nbdisplay.display_timeline(
data=selected_alert_type, time_column="StartTimeUtc",
group_by="TI Risk", source_columns=["AlertName"],
alert=rel_alert_select.selected_alert, title="Alerts over time grouped by TI risk score")
Now that we have selected an alert of interest and triage key details we need to identify next investigative steps. The cell below identifies and extracts key entities from the selected alert. It provides additional enrichment to them using OSINT and based on their type recommends an additional Notebook to run for further investigation based on the Notebooks available at https://github.com/Azure/Azure-Sentinel-Notebooks/ or via the Microsoft Sentinel portal.
from ipwhois import IPWhois
import whois
from ipaddress import ip_address
import tldextract
# Based on the extracted entity enrich it with OSINT
def enhance(row):
if row['Type'] == "ipaddress":
return whois_desc(row['Entity'])
elif row['Type'] == "host":
return host_sum(row['Entity'])
elif row['Type'] == "url":
return whois_url(row['Entity'])
# If entity is a hostname, get key details of the host.
def host_sum(host):
hb_q = f"Heartbeat | where TimeGenerated > datetime({query_times.start}) and TimeGenerated < datetime({query_times.end}) | where Computer == '{host}' | take 1"
hb = qry_prov.exec_query(hb_q)
if not hb.empty:
hb_str = f"{host} - {hb['ComputerIP'][0]} - {hb['OSType'][0]} - {hb['ComputerEnvironment'][0]}"
else:
hb_str = "No host heartbeat"
return hb_str
# If entity is IP address work out what type of address it is and if a public IP address get ASN name.
def whois_desc(ip_lookup, progress=False):
try:
ip = ip_address(ip_lookup)
except ValueError:
return "Not an IP Address"
if ip.is_private:
return "Private address space"
if not ip.is_global:
return "Other address space"
ip_whois = IPWhois(ip)
whois_result = ip_whois.lookup_whois()
return whois_result["asn_description"]
# If entity is a URL get the name of the organisation that registered the domain.
def whois_url(url):
_, domain,tld = tldextract.extract(url)
wis = whois.whois(f"{domain}.{tld}")
return wis['org']
# Based on the entity type suggest a Notebook for future investigation.
def notebook_suggestor(row):
if row['Type'] in notebooks.keys():
return notebooks[row['Type']]
else:
return "Write your own Notebook"
notebooks = {"ipaddress" : "Entity Explorer - IP Address",
"host" : "Entity Explorer - Linux Host/Windows Host",
"account" : "Entity Explorer - Account",
"url" : "Entity Explorer - Domain and URL"}
md('Entities for further investigation:', 'bold')
ents = security_alert.get_all_entities()
if not ents.empty:
ents['Notebook'] = ents.apply(notebook_suggestor, axis=1)
ents['Enrichment'] = ents.apply(enhance, axis=1)
display(ents.style.hide_index())
# Save entity details into our summary.
entities = Observation(caption="Entities for further investigation", data=ents)
summary.add_observation(entities)
else:
md('No entities found in this alert')
#Uncomment the line below to see a summary of this Notebook's output
#summary.display_observations()