Bro to Kafka to Spark

This notebook covers how to stream Bro data into Spark using Kafka as a message queue. The setup takes a bit of work but the result will be a nice scalable, robust way to process and analyze streaming data from Bro.

For a super EASY way to get started with Spark (local data without Kafka) you can view this notebook:

Software

Getting Everything Setup

Get a nice cup of coffee and some snacks, we estimate 45-60 minutes for setting up a local Spark server and configuring Bro with the Kafka plugin.

Install local Spark Server

For this notebook we're going to be using a local spark server, obviously we would want to setup a cluster for running Spark for a real system.

$ pip install pyspark

Yep, that's it. The latest version of PySpark will setup a local server with a simple pip install.

Setting up Bro with the Kafka Plugin

So this is the most complicated part of the setup, once you've setup the Bro Kafka plugin you're basically done.

Shout Out/Thanks:

Install Kafka

$ brew install kafka
$ brew Install librdkafka
$ brew install zookeeper

Note: For Ubuntu 16.04 instructions see: https://hevo.io/blog/how-to-set-up-kafka-on-ubuntu-16-04/

Add Kafka plugin to Bro

Note: This will be much easier when the Kafka Plugin is a Bro 'Package' (coming soon :)

Get and Compile Bro (you have do this for now)

$ git clone --recursive https://github.com/bro/bro.git
$ cd bro 
$ ./configure
$ make install

Get the Kafka Bro plugin

$ git clone https://github.com/apache/metron-bro-plugin-kafka
$ cd metron-bro-plugin-kafka
$ ./configure --bro-dist=$BRO_SRC_PATH
$ make install

Test the Bro Kafka Plugin

$ bro -N Bro::Kafka
> Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1)

Setup plugin in local.bro

Okay, so the logic below will output each log to a different Kafka topic. So the dns.log events will be sent to the 'dns' topic and the http.log events will be sent to the 'http' topic.. etc. If you'd like to send all the events to one topic or other configurations, please see https://github.com/apache/metron-bro-plugin-kafka

@load Bro/Kafka/logs-to-kafka.bro
redef Kafka::topic_name = "";
redef Kafka::logs_to_send = set(Conn::LOG, HTTP::LOG, DNS::LOG, SMTP::LOG);
redef Kafka::kafka_conf = table(["metadata.broker.list"] = "localhost:9092");

Start Kafka

$ zkserver start
$ kafka-server-start

Run Bro

$ bro -i en0 <path to>/local.bro
or 
$ broctl deploy

Verify messages are in the queue

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic dns

After a second or two.. you should start seeing DNS requests/replies coming out.. hit Ctrl-C after you see some.

{"ts":1503513688.232274,"uid":"CdA64S2Z6Xh555","id.orig_h":"192.168.1.7","id.orig_p":58528,"id.resp_h":"192.168.1.1","id.resp_p":53,"proto":"udp","trans_id":43933,"rtt":0.02226,"query":"brian.wylie.is.awesome.tk","qclass":1,"qclass_name":"C_INTERNET","qtype":1,"qtype_name":"A","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["17.188.137.55","17.188.142.54","17.188.138.55","17.188.141.184","17.188.129.50","17.188.128.178","17.188.129.178","17.188.141.56"],"TTLs":[25.0,25.0,25.0,25.0,25.0,25.0,25.0,25.0],"rejected":false}

If you made it this far you are done!

Structured Streaming in Spark

Structured Streaming is the new hotness with Spark. Michael Armbrust from DataBricks gave a great talk at Spark Summit 2017 on Structured Streaming:

There's also a good example on the DataBricks blog:

In [1]:
import pyspark
from pyspark.sql import SparkSession

# Always good to print out versions of libraries
print('PySpark: {:s}'.format(pyspark.__version__))
PySpark: 2.2.0

Spark It!

Spin up Spark with 4 Parallel Executors

Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:

  • If you have 4/8 cores use them!
  • It's the exact same code logic as if we were running on a distributed cluster.
  • We run the same code on DataBricks (www.databricks.com) which is awesome BTW.
In [76]:
# Spin up a local Spark Session
# Please note: the config line is an important bit,
# readStream.format('kafka') won't work without it
spark = SparkSession.builder.master('local[4]').appName('my_awesome')\
        .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0')\
        .getOrCreate()

Sidebar: Apache Arrow is going to be Awesome

For all kinds of reasons, multi-core pipelines, cross language storage, basically it will improve and enable flexible/performant data analysis and machine learning pipelines.

In [77]:
# Optimize the conversion to Pandas
spark.conf.set("spark.sql.execution.arrow.enable", "true")

Streaming data pipeline

Our streaming data pipeline looks conceptually like this.

  • Kafka Plugin for Bro
  • Publish (provides a nice decoupled architecture)
  • Pull/Subscribe to whatever feed you want (http, dns, conn, x509...)
  • ETL (Extract Transform Load) on the raw message data (parsed data with types)
  • Perform Filtering/Aggregation
  • Data Analysis and Machine Learning
In [56]:
# SUBSCRIBE: Setup connection to Kafka Stream 
raw_data = spark.readStream.format('kafka') \
  .option('kafka.bootstrap.servers', 'localhost:9092') \
  .option('subscribe', 'dns') \
  .option('startingOffsets', 'latest') \
  .load()
In [57]:
# ETL: Hardcoded Schema for DNS records (do this better later)
from pyspark.sql.types import StructType, StringType, BooleanType, IntegerType
from pyspark.sql.functions import from_json, to_json, col, struct

dns_schema = StructType() \
    .add('ts', StringType()) \
    .add('uid', StringType()) \
    .add('id.orig_h', StringType()) \
    .add('id.orig_p', IntegerType()) \
    .add('id.resp_h', StringType()) \
    .add('id.resp_p', IntegerType()) \
    .add('proto', StringType()) \
    .add('trans_id', IntegerType()) \
    .add('query', StringType()) \
    .add('qclass', IntegerType()) \
    .add('qclass_name', StringType()) \
    .add('qtype', IntegerType()) \
    .add('qtype_name', StringType()) \
    .add('rcode', IntegerType()) \
    .add('rcode_name', StringType()) \
    .add('AA', BooleanType()) \
    .add('TC', BooleanType()) \
    .add('RD', BooleanType()) \
    .add('RA', BooleanType()) \
    .add('Z', IntegerType()) \
    .add('answers', StringType()) \
    .add('TTLs', StringType()) \
    .add('rejected', BooleanType())
In [58]:
# ETL: Convert raw data into parsed and proper typed data
parsed_data = raw_data \
  .select(from_json(col("value").cast("string"), dns_schema).alias('data')) \
  .select('data.*')
In [59]:
# FILTER/AGGREGATE: In this case a simple groupby operation
group_data = parsed_data.groupBy('`id.orig_h`', 'qtype_name').count()
In [60]:
# At any point in the pipeline you can see what you're getting out
group_data.printSchema()
root
 |-- id.orig_h: string (nullable = true)
 |-- qtype_name: string (nullable = true)
 |-- count: long (nullable = false)

Streaming pipeline output to an in-memory table

Now, for demonstration and discussion purposes, we're going to pull the end of the pipeline back into memory to inspect the output. A couple of things to note explicitly here:

  • Writing a stream to memory is dangerous and should be done only on small data. Since this is aggregated output we know it's going to be small.

  • The queryName param used below will be the name of the in-memory table.

In [61]:
# Take the end of our pipeline and pull it into memory
dns_count_memory_table = group_data.writeStream.format('memory') \
  .queryName('dns_counts') \
  .outputMode('complete') \
  .start()
In [62]:
dns_count_memory_table
Out[62]:
<pyspark.sql.streaming.StreamingQuery at 0x106615e10>

Streaming Query/Table: Looking Deeper

Note: The in-memory table above is dynamic. So as the streaming data pipeline continues to process data the table contents will change. Below we make two of the same queries and as more data streams in the results will change.

In [65]:
# Create a Pandas Dataframe by querying the in memory table and converting
dns_counts_df = spark.sql("select * from dns_counts").toPandas()
print('DNS Query Counts = {:d}'.format(len(dns_counts_df)))
dns_counts_df.sort_values(ascending=False, by='count')
DNS Query Counts = 2
Out[65]:
id.orig_h qtype_name count
0 172.16.133.136 A 1
1 172.16.133.136 AAAA 1

Same Query with Updated Results

Now we run the same query as above and since the streaming pipeline continues to process new incoming data the in-memory table will dynamically update.

In [78]:
# Create a Pandas Dataframe by querying the in memory table and converting
dns_counts_df = spark.sql("select * from dns_counts").toPandas()
print('DNS Query Counts = {:d}'.format(len(dns_counts_df)))
dns_counts_df.sort_values(ascending=False, by='count')
DNS Query Counts = 9
Out[78]:
id.orig_h qtype_name count
4 172.16.133.136 A 13
3 172.16.176.184 PTR 6
1 172.16.220.140 PTR 4
6 192.17.98.96 None 4
2 172.16.159.32 PTR 2
7 172.16.133.136 SRV 2
0 172.16.128.51 PTR 1
5 172.16.133.136 AAAA 1
8 172.16.134.234 PTR 1
In [55]:
# We should stop our streaming pipeline when we're done
dns_count_memory_table.stop()

Wrap Up

Well that's it for this notebook, we know this ended before we got to the exciting part of the streaming data pipeline. For this notebook we showed everything in the pipeline up to aggregation. In future notebooks we'll dive into the deep end of our pipeline and cover the data analysis and machine learning aspects of Spark.

If you liked this notebook please visit the bat project for more notebooks and examples.