#!/usr/bin/env python # coding: utf-8 # # Extracting data from MISP using PyMISP # ## Recovering the API KEY # - Go to `Global Actions` then `My Profile` # - Access the `/users/view/me` URL # In[491]: from pymisp import PyMISP import urllib3 urllib3.disable_warnings() misp_url = 'https://localhost:8443/' misp_key = 'GqfuZo444EFlylND0XaKZsEXgWgkPgguUZ6KVRuq' # Should PyMISP verify the MISP certificate misp_verifycert = False misp = PyMISP(misp_url, misp_key, misp_verifycert) # In[492]: import datetime from pprint import pprint import base64 import subprocess # ## Retreiving an Event # In[493]: r1 = misp.get_event('7907c4a9-a15c-4c60-a1b4-1d214cf8cf41', pythonify=True) print(r1) r2 = misp.get_event(2, pythonify=False) print(type(r2)) # ## Searching the Event index # In[494]: r = misp.search_index(pythonify=True) print(r[1].uuid) # #### Only published Events # In[495]: r = misp.search_index(published=True, pythonify=True) print(r) # print(r[0].to_dict()) # #### Playing with time # **Multiple type of timestamps for Events** # - `timestamp`: Timestamp of the **last modification** of the Event or its content (include Attributes, Objects, Tags, ...) # - `published_timestamp`: Timestamp of the **last publication** of the Event # - To generate report, you usually want to use `publish_timestamp` # # **Multiple type of dates for Events** # - `date_from`: Only events having a more recent date will be returned # - `date_to`: Only events having an older date will be returned # - Both can be used at once to specify a time window # # In[496]: # Using string literal sinceLastMonth = '30d' # Using Python's datetime sinceLastMonth = datetime.date.today() - datetime.timedelta(days=30) r = misp.search_index(published=True, publish_timestamp=sinceLastMonth, pythonify=True) print(r) # #### Data returned # - Searching the index will only returns high-level information about the Event and its attached context # # - Can be useful for: # - Statistics about number of created Event # - Statistics about Organisation creating Event over time # - Statistics about distribution level usage # - And, **If Event correctly contextualized** # - Statistics about **type of incident** # - Adversary tactics and techniques with **MITRE ATT&CK** usage # - Malware familly # In[497]: event = r[0].to_dict() event_properties = event.keys() print('# Event properties') print(list(event_properties)) print('\n # Event Tags ({0})'.format(len(event['EventTag']))) pprint(event['EventTag'][0]) print('\n # Event Clusters ({0})'.format(len(event['GalaxyCluster']))) # #### Useful parameters # # - `attribute` (Optional[str]) *Filter events on attribute's value* # - `published` (Optional[bool]) # - `hasproposal` (Optional[bool]) # - `eventid` (Optional[str, int]) # - `tags` (Optional[str, List[str]]) # - `date_from` (Optional[datetime, date, int, str, float, None]) # - `date_to` (Optional[datetime, date, int, str, float, None]) # - `eventinfo` (Optional[str]) # - `threatlevel` (Optional[str, int]) # - `analysis` (Optional[str, int]) # - `distribution` (Optional[str, int]) # - `sharinggroup` (Optional[str, int]) # - `org` (Optional[str, List[[str, int]]) # - `timestamp` (Optional[datetime, date, int, str, float, None, List[[datetime, date, int, str, float, None], [datetime, date, int, str, float, None]]]) # - timestamp=(datetime.today() - timedelta(days=1)) # - timestamp=['14d', '7d'] # - timestamp=int(datetime.today().timestamp()) # - `publish_timestamp` (Optional[datetime, date, int, str, float, None, List[[datetime, date, int, str, float, None], [datetime, date, int, str, float, None]]]) # ## Retreiving data with RestSearch # # The `RestSearch` endpoint can be used on multiple scopes. It has more filtering parameters and is generally flexible. # # Supported scopes (also called Controllers): `events`, `attributes`, `objects` # # ### `/events/restSearch` VS `/attributes/restSearch` # # - Both endpoints support most of the parameter # - They differs in the data returned # - `/events/restSearch` returns the whole Event with its child elements (Attributes, Objects, Proposals, ..) # - `/attributes/restSearch` returns all attributes # #### Getting only metadata: Do not include child elements (such as Attributes, ...) # In[498]: r = misp.search(controller='events', metadata=True, pythonify=True) print(r) # ### Searching Attributes with RestSearch # #### Searching for values # In[499]: r1 = misp.search(controller='attributes', value='8.8.8.8', pythonify=True) print('Simple value:', r1) r2 = misp.search(controller='attributes', value=['8.8.8.8', '5.4.2.1'], pythonify=True) print('List of values:', r2) r3 = misp.search(controller='attributes', value=['https://www.github.com/%'], pythonify=True) print('Wildcard:', r3) # #### Searching for types # In[500]: r1 = misp.search(controller='attributes', type_attribute='first-name', pythonify=True) print(r1) r2 = misp.search(controller='attributes', type_attribute=['malware-sample', 'attachment'], pythonify=True) print(r2) # #### Searching for tags # In[501]: r1 = misp.search(controller='attributes', tags='tlp:red', pythonify=True) print('Simple tag:', len(r1)) print('\tFirst Attribute', r1[0].Tag) r2 = misp.search(controller='attributes', tags=['PAP:RED', 'tlp:red'], pythonify=True) print('List of tags:', len(r2)) print('\tThird Attribute', r2[2].Tag) # In[502]: r3 = misp.search(controller='attributes', tags=['misp-galaxy:target-information=%'], pythonify=True) print('Wildcard:', len(r3)) print('\tTags of all Attributes:', [attr.Tag for attr in r3]) print() print(base64.b64decode('T3BlbiBxdWVzdGlvbjogV2h5IGRvIHdlIGhhdmUgQXR0cmlidXRlcyBkZXNwaXRlIHRoZW0gbm90IGhhdmluZyB0aGUgY29ycmVjdCB0YWcgYXR0YWNoZWQ/Cg==').decode()) # In[503]: allEventTags = [ [tag.name for tag in misp.get_event(attr.event_id, pythonify=True).Tag if tag.name.startswith('misp-galaxy:target-information=')] for attr in r3 ] allUniqueEventTag = set() for tags in allEventTags: for tag in tags: allUniqueEventTag.add(tag) print('All unique Event tags:', allUniqueEventTag) # In[504]: r4 = misp.search( controller='attributes', tags=['misp-galaxy:target-information=%', '!misp-galaxy:target-information="Luxembourg"'], pythonify=True) print('Negation:', len(r4)) # Showing unique Event tags allEventTags = [ [tag.name for tag in misp.get_event(attr.event_id, pythonify=True).Tag if tag.name.startswith('misp-galaxy:target-information=')] for attr in r4 ] allUniqueEventTag = set() for tags in allEventTags: for tag in tags: allUniqueEventTag.add(tag) print('All unique Event tags:', allUniqueEventTag) # **Want to also have the Event tags included**? # In[505]: r5 = misp.search( controller='attributes', tags='misp-galaxy:target-information=%', pythonify=True) print('Tags of first attibute:', [tag.name for tag in r5[0].Tag]) r6 = misp.search( controller='attributes', tags='misp-galaxy:target-information=%', includeEventTags=True, pythonify=True) print('Tags of first attibute:', [tag.name for tag in r6[0].Tag]) # **Complex query** # In[506]: complex_query = misp.build_complex_query(or_parameters=['tlp:amber', 'adversary:infrastructure-type="c2"']) r7 = misp.search( controller='attributes', tags=complex_query, includeEventTags=True, pythonify=True) print('Or:', len(r7)) pprint([ [tag.name for tag in attr.Tag if (tag.name == 'tlp:amber' or tag.name == 'adversary:infrastructure-type="c2"')] for attr in r7[:5] ]) print() complex_query = misp.build_complex_query(and_parameters=['tlp:amber', 'adversary:infrastructure-type="c2"']) r8 = misp.search( controller='attributes', tags=complex_query, includeEventTags=True, pythonify=True) print('And:', len(r8)) pprint([ [tag.name for tag in attr.Tag if (tag.name == 'tlp:amber' or tag.name == 'adversary:infrastructure-type="c2"')] for attr in r8 ]) # #### Searching on GalaxyCluster metadata # In[507]: body = { 'galaxy.member-of': 'NATO', 'galaxy.official-languages': 'French', } events = misp.direct_call('/events/restSearch', body) print('Events: ', len(events)) pprint([ [tag['name'] for tag in event['Event']['Tag'] if tag['name'].startswith('misp-galaxy:target-information')] for event in events ]) # - **Note 1**: The `galaxy.*` instructions are not supported by PyMISP # - **Note 2**: Each `galaxy.*` instructions are **AND**ed and are applied for the same cluster # - Cannot combine from different clusters # - Combining `Galaxy.official-languages` and `Galaxy.synonyms` would likely gives no result # #### Searching on creator Organisation metadata # In[508]: all_orgs = misp.organisations() print('Organisation nationality:', {org['Organisation']['name']: org['Organisation']['nationality'] for org in all_orgs}) body = { 'org.nationality': ['Luxembourg'], 'org.sector': ['financial'], } events = misp.direct_call('/events/restSearch', body) print('Events: ', len(events)) print('Org for each Event:', [event['Event']['Orgc']['name'] for event in events]) # - **Note 1**: The `org.*` instructions are not supported by PyMISP # #### ReturnFormat # **CSV** # In[509]: r1 = misp.search( controller='attributes', type_attribute=['ip-src', 'ip-dst'], return_format='csv') print(r1) # **Aggregated context** with `context-markdown`, `context` and `attack` # In[510]: # Get the context of Events that were created by organisations from the financial sector body = { 'returnFormat': 'context-markdown', 'org.sector': ['financial'], } r2 = misp.direct_call('/events/restSearch', body) print(r2) # In[511]: # Get the context of Events that had the threat actor APT-29 attached body = { 'returnFormat': 'context', 'tags': ['misp-galaxy:threat-actor=\"APT 29\"'], 'staticHtml': 1, # If you want a JS-free HTML } r2 = misp.direct_call('/events/restSearch', body) with open('/tmp/attackOutput.html', 'w') as f: f.write(r2) # subprocess.run(['google-chrome', '--incognito', '/tmp/attackOutput.html']) # #### Be carefull with the amount of data you ask, use `pagination` if needed # # - `limit`: Specify the amount of data to be returned # - `page`: Specify the start of the rolling window. Is **not** zero-indexed # # If the size of the returned data is larger than the memory enveloppe you might get a different behavior based on your MISP setting: # - Nothing returned. Allowed memeory by PHP process exausted # - Data returned but slow. MISP will concatenante the returned data in a temporary file on disk # - This behavior is only applicable for `/*/restSearch` endpoints # In[ ]: r1 = misp.search(controller='attributes', pythonify=True) print('Amount of Attributes', len(r1)) r2 = misp.search( controller='attributes', page=1, limit=5, pythonify=True) print('Amount of paginated Attributes', len(r2)) # ## Searching for Sightings # In[513]: body = { 'last': '7d' } sightings = misp.direct_call('/sightings/restSearch', body) pprint(sightings) # ## Plotting data # #### Sightings over time # In[512]: import pandas as pd import matplotlib.pyplot as plt # In[514]: # Converting our data to Panda DataFrame sighting_rearranged = [sighting['Sighting'] for sighting in sightings] df = pd.DataFrame.from_dict(sighting_rearranged) df["date_sighting"] = pd.to_datetime(df["date_sighting"], unit='s') df['one'] = 1 df # In[ ]: print('Min and Max:', df['date_sighting'].min(), df['date_sighting'].max()) print('Time delta:', df['date_sighting'].max() - df['date_sighting'].min()) print('Unique Event IDs:', df.event_id.unique()) # In[515]: # Grouping by Attribute value value_count = df['attribute_id'].value_counts() print(value_count) value_count.plot(kind='bar', rot=45) # In[516]: # Grouping by weekday (0-indexed) amount_per_weekday = df['date_sighting'].dt.weekday.value_counts() print(amount_per_weekday) amount_per_weekday.plot(kind='bar', rot=0) # In[517]: amount_per_weekday_for_each_attribute = df.groupby([df['date_sighting'].dt.hour])['one'].sum() print(amount_per_weekday_for_each_attribute) amount_per_weekday_for_each_attribute.plot(kind='bar', rot=0)