The aim of this notebook is to provide an example of sourcing, processing and ingesting security data to a custom Kusto aka Azure Data Explorer (ADE) cluster.
Kusto/ADE is a fast and highly scalable data exploration service for log and telemetry data, hosted in Azure - and is used across Microsoft for analysing huge datasets of this sort.
For this data ingestion notebook only, the following are additional requirements:
> pip install azure-kusto-data azure-kusto-ingest
We use the Open Threat Research Forge Mordor Security Datasets, and the corresponding MSTICPY Mordor Data Provider to retrieve them.
The plan here is to consider Microsoft-Windows-Sysmon/Operational
data and create a set of database tables, each holding the
events (merged from across different datasets) belonging to a specific event type. For more information on this source data,
see Sysinternals Sysmon.
After processing and uploading, we'll end up with a structure that looks like the following:
cluster('msticpykustodemo')
.database('MsticPyKustoDemo')
.table('Event1')
... set of Sysmon Process Creation events (Event Type 1)
.table('Event2')
... set of Sysmon File Creation Time events (Event Type 2)
etc.
___See: ./Kusto-Analysis.ipynb for details on data analysis, following on from the prep and ingestion here.___
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.helpers import dataframe_from_result_table
from azure.kusto.ingest import IngestionProperties, QueuedIngestClient
import pandas as pd
import re
from msticpy.data import QueryProvider
You'll need to provide Query and Ingest URIs for your own writable Kusto/ADE cluster. See Quickstart: Create an Azure Data Explorer cluster and database for guidance on how to set one of these up within your Azure subscription.
When producing this example, I provisioned a dev/test cluster with the following configuration:
Region = UK West
Availability zones = (none)
Cluster name = msticpykustodemo
Workload = Dev/test
Size = (none)
Compute = Dev(No SLA)_Standard_D11_v2
Scaling method = Manual scale
Instance count = 1
Streaming ingestion = Off
Enable purge = Off
Auto-Stop cluster = On
System-assigned identity = On
Double encryption = Off
User-assigned Identity = Off
User-assigned identities = (none)
Tenants permissions = My tenant only
___Caution: watch out for costs! Azure Kusto/ADE clusters aren't cheap to run!___
# Replace these with your own values.
KUSTO_CLUSTER_URI = 'https://msticpykustodemo.ukwest.kusto.windows.net'
KUSTO_CLUSTER_INGEST_URI = 'https://ingest-msticpykustodemo.ukwest.kusto.windows.net'
KUSTO_DATABASE = 'MsticPyKustoDemo'
Here we use the MSTICPY Mordor Query Provider to retrieve a number of example source datasets.
We then split these up to create a series of Pandas DataFrames, each containing a single event type, for events from all sources.
mdr_data = QueryProvider("Mordor", use_cached=True, save_folder='./mordor', silent=True)
mdr_data.connect()
events = {}
for dataset in mdr_data.list_queries():
if dataset.startswith('atomic.windows'):
print('[+] Retrieving Dataset', f'"{dataset}"')
try:
df = getattr(mdr_data, dataset)(silent=True, use_cached=True)
if type(df) != pd.DataFrame:
continue
for i, r in df.iterrows():
row_entry = r.to_dict()
# For demo purposes, we'll consider only Sysmon event logs.
if row_entry['Channel'] != 'Microsoft-Windows-Sysmon/Operational':
continue
# Every entry should have a @timestamp:datetime field.
if '@timestamp' not in row_entry:
if 'TimeCreated' not in row_entry:
continue
row_entry['@timestamp'] = row_entry['TimeCreated']
del row_entry['TimeCreated']
for k, v in row_entry.items():
# Try and parse datetimes, in various formats, to datetimes
if type(v) != str:
continue
if re.match(r'^\d{4}-\d{2}-\d{2}[ T]{1}\d{2}:\d{2}:\d{2}(\.\d+){0,1}([+-][\.\d]{5}|Z){0,1}$', v):
row_entry[k] = pd.to_datetime(v, utc=True)
# Normalise fieldnames - remove @, capitalise.
for k in list(row_entry.keys()):
k_fixed = k
if k_fixed[0] == '@': k_fixed = k_fixed[1:]
if k_fixed[0].islower(): k_fixed = k_fixed[0].upper() + k_fixed[1:]
if k != k_fixed:
row_entry[k_fixed] = row_entry[k]
del row_entry[k]
event_id = row_entry['EventID']
if not event_id in events:
events[event_id] = []
events[event_id].append(row_entry)
except Exception as ex:
print('[!] Retrieving', f'"{dataset}"', 'Failed [', str(ex), ']')
Retrieving Mitre data... Retrieving Mordor data...
Downloading Mordor metadata: 100%|██████████| 96/96 [00:00<00:00, 2154.64 files/s]
[+] Retrieving Dataset "atomic.windows.collection.host.msf_record_mic" [+] Retrieving Dataset "atomic.windows.credential_access.host.cmd_lsass_memory_dumpert_syscalls" [+] Retrieving Dataset "atomic.windows.credential_access.host.cmd_psexec_lsa_secrets_dump" [+] Retrieving Dataset "atomic.windows.credential_access.host.cmd_sam_copy_esentutl" [+] Retrieving Dataset "atomic.windows.credential_access.host.covenant_dcsync_dcerpc_drsuapi_DsGetNCChanges" [+] Retrieving Dataset "atomic.windows.credential_access.host.empire_dcsync_dcerpc_drsuapi_DsGetNCChanges" [+] Retrieving Dataset "atomic.windows.credential_access.host.empire_mimikatz_backupkeys_dcerpc_smb_lsarpc" [+] Retrieving Dataset "atomic.windows.credential_access.host.empire_mimikatz_extract_keys" [+] Retrieving Dataset "atomic.windows.credential_access.host.empire_mimikatz_logonpasswords" [+] Retrieving Dataset "atomic.windows.credential_access.host.empire_mimikatz_lsadump_patch" [+] Retrieving Dataset "atomic.windows.credential_access.host.empire_mimikatz_sam_access" [+] Retrieving Dataset "atomic.windows.credential_access.host.empire_over_pth_patch_lsass" [+] Retrieving Dataset "atomic.windows.credential_access.host.empire_powerdump_sam_access" [+] Retrieving Dataset "atomic.windows.credential_access.host.empire_shell_reg_dump_sam" [+] Retrieving Dataset "atomic.windows.credential_access.host.empire_shell_rubeus_asktgt_createnetonly" [+] Retrieving Dataset "atomic.windows.credential_access.host.empire_shell_rubeus_asktgt_ptt" [+] Retrieving Dataset "atomic.windows.credential_access.host.psh_input_capture_promptforcreds" [+] Retrieving Dataset "atomic.windows.credential_access.host.psh_lsass_memory_dump_comsvcs" [+] Retrieving Dataset "atomic.windows.credential_access.host.psh_windows_vault_web_credentials" [+] Retrieving Dataset "atomic.windows.credential_access.host.rdp_interactive_taskmanager_lsass_dump" [+] Retrieving Dataset "atomic.windows.credential_access.network.covenant_dcsync_dcerpc_drsuapi_DsGetNCChanges" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.credential_access.network.empire_dcsync_dcerpc_drsuapi_DsGetNCChanges" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.credential_access.network.empire_mimikatz_backupkeys_dcerpc_smb_lsarpc" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.credential_access.network.empire_shell_rubeus_asktgt_createnetonly" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.credential_access.network.empire_shell_rubeus_asktgt_ptt" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.defense_evasion.host.cmd_bitsadmin_download_psh_script" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.cmd_mshta_javascript_getobject_sct" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.cmd_mshta_vbscript_execute_psh" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.cmd_netsh_fw_mod_open_ports" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.cmd_process_herpaderping_snippingtool" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.covenant_installutil" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.covenant_lolbin_wuauclt_createremotethread" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.empire_dllinjection_LoadLibrary_CreateRemoteThread" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.empire_launcher_sct_regsvr32" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.empire_monologue_netntlm_downgrade" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.empire_powerview_ldap_ntsecuritydescriptor" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.empire_psinject_PEinjection" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.psh_cmstp_execution_bypassuac" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.psh_control_panel_execution" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.psh_hh_local_html_payload" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.psh_mavinject_dll_notepad" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.psh_mshta_html_application_execution" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.psh_register_cimprovider_execute_dll" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.purplesharp_pe_injection_createremotethread" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.wmic_remote_xsl_jscript" [+] Retrieving Dataset "atomic.windows.defense_evasion.host.empire_enable_rdp.tar" [!] Retrieving "atomic.windows.defense_evasion.host.empire_enable_rdp.tar" Failed [ File type not supported https://raw.githubusercontent.com/OTRF/Security-Datasets/master/datasets/atomic/windows/defense_evasion/host/empire_enable_rdp.tar.gz ] [+] Retrieving Dataset "atomic.windows.defense_evasion.host.empire_wdigest_downgrade.tar" [!] Retrieving "atomic.windows.defense_evasion.host.empire_wdigest_downgrade.tar" Failed [ File type not supported https://raw.githubusercontent.com/OTRF/Security-Datasets/master/datasets/atomic/windows/defense_evasion/host/empire_wdigest_downgrade.tar.gz ] [+] Retrieving Dataset "atomic.windows.defense_evasion.network.empire_powerview_ldap_ntsecuritydescriptor" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.discovery.host.cmd_discover_iexplorer_version_registry" [+] Retrieving Dataset "atomic.windows.discovery.host.cmd_seatbelt_group_user" [+] Retrieving Dataset "atomic.windows.discovery.host.covenant_getdomaingroup_ldap_searchrequest_domain_admins" [+] Retrieving Dataset "atomic.windows.discovery.host.empire_find_localadmin_smb_svcctl_OpenSCManager" [+] Retrieving Dataset "atomic.windows.discovery.host.empire_getsession_dcerpc_smb_srvsvc_NetSessEnum" [+] Retrieving Dataset "atomic.windows.discovery.host.empire_shell_net_local_users" [+] Retrieving Dataset "atomic.windows.discovery.host.empire_shell_net_localgroup_administrators" [+] Retrieving Dataset "atomic.windows.discovery.host.empire_shell_rpc_samr_smb_group_domain_admins_standard_user" [+] Retrieving Dataset "atomic.windows.discovery.host.empire_shell_samr_EnumDomainUsers" [+] Retrieving Dataset "atomic.windows.discovery.network.covenant_getdomaingroup_ldap_searchrequest_domain_admins" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.discovery.network.empire_getsession_dcerpc_smb_srvsvc_NetSessEnum" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.discovery.network.empire_shell_rpc_samr_smb_group_domain_admins_standard_user" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.discovery.network.empire_shell_samr_EnumDomainUsers" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.execution.host.cmd_sharpview_pcre_net" [+] Retrieving Dataset "atomic.windows.execution.host.empire_launcher_vbs" [+] Retrieving Dataset "atomic.windows.execution.host.psh_powershell_httplistener" [+] Retrieving Dataset "atomic.windows.execution.host.psh_python_webserver" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.aadinternals_export_adfsdatabaseconfig_remotely" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.covenant_copy_smb_CreateRequest" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.covenant_dcom_executeexcel4macro_allowed" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.covenant_dcom_iertutil_dll_hijack" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.covenant_dcom_registerxll" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.covenant_psremoting_command" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.covenant_sc_query_dcerpc_smb_svcctl" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.covenant_sharpsc_create_dcerpc_smb_svcctl" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.covenant_sharpsc_query_dcerpc_smb_svcctl" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.covenant_sharpsc_start_dcerpc_smb_svcctl" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.covenant_sharpsc_stop_dcerpc_smb_svcctl" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.covenant_sharpwmi_create_dcerpc_wmi" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.covenant_wmi_remote_event_subscription_ActiveScriptEventConsumers" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.covenant_wmi_wbemcomn_dll_hijack" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.empire_dcom_shellwindows_stager" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.empire_msbuild_dcerpc_wmi_smb" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.empire_psexec_dcerpc_tcp_svcctl" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.empire_psremoting_stager" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.empire_shell_dcerpc_smb_service_dll_hijack" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.empire_smbexec_dcerpc_smb_svcctl" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.empire_wmi_dcerpc_wmi_IWbemServices_ExecMethod" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.empire_wmic_add_user_backdoor" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.mimikatz_CVE-2020-1472_Unauthenticated_NetrServerAuthenticate2" [!] Retrieving "atomic.windows.lateral_movement.host.mimikatz_CVE-2020-1472_Unauthenticated_NetrServerAuthenticate2" Failed [ QueryContainer object has no attribute windows.lateral_movement.host.mimikatz_CVE-2020-1472_Unauthenticated_NetrServerAuthenticate2 ] [+] Retrieving Dataset "atomic.windows.lateral_movement.host.purplesharp_ad_playbook_I" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.schtask_create" [+] Retrieving Dataset "atomic.windows.lateral_movement.host.schtask_modification" [+] Retrieving Dataset "atomic.windows.lateral_movement.network.aadinternals_export_adfsdatabaseconfig_remotely" Cannot process files of type .pcapng [+] Retrieving Dataset "atomic.windows.lateral_movement.network.covenant_dcom_executeexcel4macro_allowed" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.covenant_dcom_iertutil_dll_hijack" Cannot process files of type .cap Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.covenant_dcom_registerxll" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.covenant_psremoting_command" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.covenant_sc_query_dcerpc_smb_svcctl" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.covenant_sharpsc_create_dcerpc_smb_svcctl" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.covenant_sharpsc_query_dcerpc_smb_svcctl" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.covenant_sharpsc_start_dcerpc_smb_svcctl" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.covenant_sharpsc_stop_dcerpc_smb_svcctl" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.covenant_sharpwmi_create_dcerpc_wmi" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.covenant_wmi_remote_event_subscription_ActiveScriptEventConsumers" Cannot process files of type .cap Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.covenant_wmi_wbemcomn_dll_hijack" Cannot process files of type .cap Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.empire_dcom_shellwindows_stager"
MsticpyUserError - Mordor download error
Could not extract zip file for https://raw.githubusercontent.com/OTRF/Security-Datasets/master/datasets/atomic/windows/lateral_movement/network/empire_dcom_shellwindows_stager.zip.[!] Retrieving "atomic.windows.lateral_movement.network.empire_dcom_shellwindows_stager" Failed [ ('Mordor download error', 'Could not extract zip file for https://raw.githubusercontent.com/OTRF/Security-Datasets/master/datasets/atomic/windows/lateral_movement/network/empire_dcom_shellwindows_stager.zip.', 'File does not exist or is corrupt.', 'https://msticpy.readthedocs.io/en/latest/data_acquisition/MordorData.html') ] [+] Retrieving Dataset "atomic.windows.lateral_movement.network.empire_msbuild_dcerpc_wmi_smb" Cannot process files of type .cap Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.empire_psexec_dcerpc_tcp_svcctl" Cannot process files of type .cap Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.empire_psremoting_stager" Cannot process files of type .cap Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.empire_shell_dcerpc_smb_service_dll_hijack" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.empire_smbexec_dcerpc_smb_svcctl" Cannot process files of type .cap Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.empire_wmi_dcerpc_wmi_IWbemServices_ExecMethod" Cannot process files of type .cap Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.mimikatz_CVE-2020-1472_Unauthenticated_NetrServerAuthenticate2" [!] Retrieving "atomic.windows.lateral_movement.network.mimikatz_CVE-2020-1472_Unauthenticated_NetrServerAuthenticate2" Failed [ QueryContainer object has no attribute windows.lateral_movement.network.mimikatz_CVE-2020-1472_Unauthenticated_NetrServerAuthenticate2 ] [+] Retrieving Dataset "atomic.windows.lateral_movement.network.purplesharp_ad_playbook_I" Cannot process files of type .cap [+] Retrieving Dataset "atomic.windows.lateral_movement.network.schtask_create" Cannot process files of type .pcapng [+] Retrieving Dataset "atomic.windows.lateral_movement.network.schtask_modification" Cannot process files of type .pcapng [+] Retrieving Dataset "atomic.windows.other.aptsimulator_cobaltstrike" [+] Retrieving Dataset "atomic.windows.persistence.host.cmd_userinitmprlogonscript_batch" [+] Retrieving Dataset "atomic.windows.persistence.host.empire_persistence_registry_modification_run_keys_elevated_user" [+] Retrieving Dataset "atomic.windows.persistence.host.empire_persistence_registry_modification_run_keys_standard_user" [+] Retrieving Dataset "atomic.windows.persistence.host.empire_schtasks_creation_execution_elevated_user" [+] Retrieving Dataset "atomic.windows.persistence.host.empire_schtasks_creation_standard_user" [+] Retrieving Dataset "atomic.windows.persistence.host.empire_wmi_local_event_subscriptions_elevated_user" [+] Retrieving Dataset "atomic.windows.persistence.host.proxylogon_ssrf_rce_poc" [+] Retrieving Dataset "atomic.windows.privilege_escalation.host.cmd_service_mod_fax" [+] Retrieving Dataset "atomic.windows.privilege_escalation.host.empire_uac_shellapi_fodhelper"
for event_id in sorted(events.keys()):
df = pd.DataFrame(events[event_id])
df.sort_values('Timestamp', inplace=True)
df.reset_index(drop=True, inplace=True)
# Because the source dataframes (and hence rows for this dataframe) contained many event types,
# they have many irrelevant columns for this single event type.
# Here we attempt to filter out any columns that are all empty, dealing with different kinds of empty.
null_columns = []
for test_column in df.columns:
if df[~df[test_column].isnull() & (df[test_column]!='-')].empty:
null_columns.append(test_column)
df.drop(null_columns, axis=1, inplace=True)
events[event_id] = df
print('[+] EventID: ', event_id, ', #Records:', df.shape[0], ', #Columns: ', df.shape[1], sep='')
[+] EventID: 1, #Records:1240, #Columns: 56 [+] EventID: 2, #Records:532, #Columns: 39 [+] EventID: 3, #Records:2592, #Columns: 51 [+] EventID: 4, #Records:3, #Columns: 33 [+] EventID: 5, #Records:1078, #Columns: 38 [+] EventID: 6, #Records:1, #Columns: 35 [+] EventID: 7, #Records:57139, #Columns: 49 [+] EventID: 8, #Records:350, #Columns: 44 [+] EventID: 9, #Records:1838, #Columns: 37 [+] EventID: 10, #Records:103207, #Columns: 46 [+] EventID: 11, #Records:5256, #Columns: 40 [+] EventID: 12, #Records:114904, #Columns: 40 [+] EventID: 13, #Records:50340, #Columns: 42 [+] EventID: 14, #Records:1, #Columns: 34 [+] EventID: 15, #Records:76, #Columns: 38 [+] EventID: 17, #Records:256, #Columns: 38 [+] EventID: 18, #Records:858, #Columns: 40 [+] EventID: 19, #Records:3, #Columns: 37 [+] EventID: 20, #Records:3, #Columns: 37 [+] EventID: 21, #Records:3, #Columns: 36 [+] EventID: 22, #Records:544, #Columns: 39 [+] EventID: 23, #Records:1584, #Columns: 41 [+] EventID: 24, #Records:17, #Columns: 39 [+] EventID: 26, #Records:23, #Columns: 19 [+] EventID: 255, #Records:6, #Columns: 32
First, we need to authenticate to Kusto/ADE. By default, we'll use "az" client auth - which requires that you run "az login" from a shell. See Kusto connection strings for more details and alternative options.
Next, we retrieve the existing database tables, then drop any with existing names that we intend to replace.
Following that, we create a new table for each event type, then upload the DataFrame containing our records (with some type conversion as appropriate).
# Use Azure CLI authentication: requires that you run "az login".
kusto_client = KustoClient(KustoConnectionStringBuilder.with_az_cli_authentication(KUSTO_CLUSTER_URI))
kusto_ingest_client = QueuedIngestClient(KustoConnectionStringBuilder.with_az_cli_authentication(KUSTO_CLUSTER_INGEST_URI))
# Query existing tables
db_tables = dataframe_from_result_table(kusto_client.execute_mgmt(KUSTO_DATABASE, '.show tables').primary_results[0])
ingest_tables = [f'Event{k}' for k in events.keys()]
# !!! Caution !!!
# Drop any existing tables that match our new table names
drop_tables = [tbl for tbl in ingest_tables if not db_tables[db_tables['TableName']==tbl].empty]
if len(drop_tables):
db_tables = dataframe_from_result_table(kusto_client.execute_mgmt(KUSTO_DATABASE, f'.drop tables ({",".join(drop_tables)})').primary_results[0])
pandas_kusto_type_mappings = {
'datetime64[ns, UTC]': 'datetime',
'object': 'string',
'float64': 'decimal',
'int64': 'long'
}
for event_key in events.keys():
tbl_name = f'Event{event_key}'
print('[+] Ingesting "', tbl_name, '"...', sep='')
event_columns = events[event_key].dtypes.to_dict()
event_columns = ", ".join([f'{k}:{pandas_kusto_type_mappings.get(str(v), str(v))}' for k, v in event_columns.items()])
kusto_client.execute_mgmt(KUSTO_DATABASE, f'.create table {tbl_name} ({event_columns})').primary_results[0]
# Ingest the DataFrame.
ingest_result = kusto_ingest_client.ingest_from_dataframe(
events[event_key],
ingestion_properties=IngestionProperties(
database=KUSTO_DATABASE,
table=tbl_name
)
)
print(' ', ingest_result.status)
[+] Ingesting "Event13"... IngestionStatus.QUEUED [+] Ingesting "Event12"... IngestionStatus.QUEUED [+] Ingesting "Event10"... IngestionStatus.QUEUED [+] Ingesting "Event9"... IngestionStatus.QUEUED [+] Ingesting "Event3"... IngestionStatus.QUEUED [+] Ingesting "Event7"... IngestionStatus.QUEUED [+] Ingesting "Event18"... IngestionStatus.QUEUED [+] Ingesting "Event1"... IngestionStatus.QUEUED [+] Ingesting "Event23"... IngestionStatus.QUEUED [+] Ingesting "Event11"... IngestionStatus.QUEUED [+] Ingesting "Event2"... IngestionStatus.QUEUED [+] Ingesting "Event5"... IngestionStatus.QUEUED [+] Ingesting "Event22"... IngestionStatus.QUEUED [+] Ingesting "Event17"... IngestionStatus.QUEUED [+] Ingesting "Event24"... IngestionStatus.QUEUED [+] Ingesting "Event8"... IngestionStatus.QUEUED [+] Ingesting "Event26"... IngestionStatus.QUEUED [+] Ingesting "Event15"... IngestionStatus.QUEUED [+] Ingesting "Event19"... IngestionStatus.QUEUED [+] Ingesting "Event20"... IngestionStatus.QUEUED [+] Ingesting "Event21"... IngestionStatus.QUEUED [+] Ingesting "Event14"... IngestionStatus.QUEUED [+] Ingesting "Event255"... IngestionStatus.QUEUED [+] Ingesting "Event4"... IngestionStatus.QUEUED [+] Ingesting "Event6"... IngestionStatus.QUEUED
You should soon be able to query and interact with the data in Kusto - see Kusto-Analysis.ipynb for details.
Kusto make take some minutes to actually perform the ingest - and so data won't be available to query immediately!