This notebook covers how to stream Zeek data using Kafka as a message queue. The setup takes a bit of work but the result will be robust way to stream data from Zeek.
To set some context, our long term plan is to build out a streaming data pipeline. This notebook will help you get started on this path. After completing this notebook you can look at the next steps by viewing our notebooks that use Spark on Zeek output.
So our streaming pipeline looks conceptually like this.
Things you'll need:
The weblinks above do a pretty good job of getting you setup with Zeek, Kafka, and the Kafka plugin. If you already have these thing setup then you're good to go. If not take some time and get both up and running. If you're a bit wacky (like me) and want to set these thing up on a Mac you might check out my notes here Zeek/Kafka Mac Setup
Okay now that Zeek with the Kafka Plugin is setup, lets do just a bit of testing to make sure it's all AOK before we get into making a Kafka consumer in Python.
Test the Zeek Kafka Plugin
Make sure the Kafka plugin is ready to go by running the follow command on your Zeek instance:
$ zeek -N Apache::Kafka
Apache::Kafka - Writes logs to Kafka (dynamic, version 0.3.0)
Activate the Kafka Plugin
There's a good explanation of all the options here (https://github.com/apache/metron-bro-plugin-kafka). In my case I needed to put a different load command when 'activating' the Kafka plugin in my local.zeek configuration file. Here's what I added to the 'standard' site/local.zeek file.
@load Apache/Kafka
redef Kafka::topic_name = "";
redef Kafka::send_all_active_logs = T;
redef Kafka::kafka_conf = table(
["metadata.broker.list"] = "localhost:9092"
);
The first line took me a while to figure out
The rest is, at least for me, the best setup:
By putting in a blank topic name, all output topics are labeled with the name of their log file. For instance, stuff that goes to dns.log is mapped to the 'dns' Kafka topic, http.log to the 'http' topic, and so on. This was exactly what I wanted.
$ zeek -i en0 <path to>/local.zeek
or
$ zeekctl deploy
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic dns
After a second or two.. you should start seeing DNS requests/replies coming out.. hit Ctrl-C after you see some.
{"ts":1503513688.232274,"uid":"CdA64S2Z6Xh555","id.orig_h":"192.168.1.7","id.orig_p":58528,"id.resp_h":"192.168.1.1","id.resp_p":53,"proto":"udp","trans_id":43933,"rtt":0.02226,"query":"brian.wylie.is.awesome.tk","qclass":1,"qclass_name":"C_INTERNET","qtype":1,"qtype_name":"A","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["17.188.137.55","17.188.142.54","17.188.138.55","17.188.141.184","17.188.129.50","17.188.128.178","17.188.129.178","17.188.141.56"],"TTLs":[25.0,25.0,25.0,25.0,25.0,25.0,25.0,25.0],"rejected":false}
# Okay so now that the setup is done lets put together a bit of code to
# process the Kafka 'topics' that are now being streamed from our Zeek instance
# First we create a Kafka Consumer
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer('dns', bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
# Now lets process our Kafka Messages
for message in consumer:
print(message.value)
# Note: This will just loop forever, but here's an
# example of the types of output you'll see
{'ts': 1570120289.692109, 'uid': 'CAdnHRVdI94Upoej7', 'id.orig_h': '192.168.1.7', '...
{'ts': 1570120295.655344, 'uid': 'Ctcv6F2bLT8fB9GOUb', 'id.orig_h': '192.168.1.5', ...
{'ts': 1570120295.663177, 'uid': 'CLrohRNbVWuBecKud', 'id.orig_h': '192.168.1.2', '...
{'ts': 1570120295.765735, 'uid': 'CxhnkA3sMdZcQJ6vf7', 'id.orig_h': '192.168.1.7', '...
{'ts': 1570120295.765745, 'uid': 'CEPF9E4a9WeM1cFlSk', 'id.orig_h': 'fe80::4b8:c380:5a7...
Okay so now we can actually do something useful with our new streaming data, in this case we're going to use some results from our 'Risky Domains' Notebook that computed a risky set of TLDs.
from pprint import pprint
import tldextract
from zat.utils import vt_query
# Create a VirusTotal Query Class
vtq = vt_query.VTQuery()
risky_tlds = set(['info', 'tk', 'xyz', 'online', 'club', 'ru', 'website',
'in', 'ws', 'top', 'site', 'work', 'biz', 'name', 'tech'])
Using public VT API Key: Please set apikey=<your key> when creating this class
# Now lets process our Kafka 'dns' Messages
for message in consumer:
dns_message = message.value
# Pull out the TLD
query = dns_message['query']
tld = tldextract.extract(query).suffix
# Check if the TLD is in the risky group
if tld in risky_tlds:
# Make the query with the full query
results = vtq.query_url(query)
if results.get('positives'):
print('\nOMG the Network is on Fire!!!')
pprint(results)
OMG the Network is on Fire!!! {'filescan_id': None, 'positives': 2, 'query': 'uni10.tk', 'scan_date': '2019-05-29 09:03:43', 'scan_results': [('clean site', 59), ('unrated site', 8), ('malware site', 1), ('suspicious site', 1), ('malicious site', 1)], 'total': 70, 'url': 'http://uni10.tk/'}
Recall that our long term plan is to build out a streaming data pipeline. This notebook has helped you get started on this path. After completing this notebook you can look at the next steps by viewing our notebooks that use Spark on Zeek output.
Well that's it for this notebook, we setup Zeek with the Kafka plugin and showed a simple use of how we might process the streaming data coming from Kafka.
The company was formed so that its developers could follow their passion for Python, streaming data pipelines and having fun with data analysis. We also think cows are cool and should be superheros or at least carry around rayguns and burner phones. Visit SuperCowPowers