In this notebook we will pull Zeek data into Spark then do some analysis and clustering. The first step is to convert your Zeek log data into a Parquet file, for instructions on how to do this (just a few lines of Python code using the ZAT package) please see this notebook:
Apache Parquet is a columnar storage format focused on performance. Reading Parquet data is fast and efficient, for this notebook we will specifically be using it for loading data into Spark.
# Third Party Imports
import pyspark
from pyspark.sql import SparkSession
# Local imports
import zat
# Good to print out versions of stuff
print('ZAT: {:s}'.format(zat.__version__))
print('PySpark: {:s}'.format(pyspark.__version__))
ZAT: 0.3.7 PySpark: 2.4.4
Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:
# Spin up a local Spark Session (with 4 executors)
spark = SparkSession.builder.master("local[4]").appName('my_awesome').getOrCreate()
# Have Spark read in the Parquet File
spark_df = spark.read.parquet('/Users/briford/data/bro/dns.parquet')
We should always inspect out data when it comes in. Look at both the data values and the data types to make sure you're getting exactly what you should be.
# Get information about the Spark DataFrame
num_rows = spark_df.count()
print("Number of Rows: {:d}".format(num_rows))
columns = spark_df.columns
print("Columns: {:s}".format(','.join(columns)))
Number of Rows: 427935 Columns: ts,uid,id_orig_h,id_orig_p,id_resp_h,id_resp_p,proto,trans_id,query,qclass,qclass_name,qtype,qtype_name,rcode,rcode_name,AA,TC,RD,RA,Z,answers,TTLs,rejected
spark_df.groupby('qtype_name','proto').count().sort('count', ascending=False).show()
+----------+-----+------+ |qtype_name|proto| count| +----------+-----+------+ | A| udp|212473| | NB| udp| 77199| | AAAA| udp| 54519| | PTR| udp| 52991| | TXT| udp| 12644| | SRV| udp| 12268| | -| udp| 3472| | *| udp| 882| | AXFR| tcp| 440| | SOA| udp| 346| | TXT| tcp| 226| | -| tcp| 176| | MX| udp| 169| | NS| udp| 43| | HINFO| udp| 30| | NAPTR| udp| 27| | PTR| tcp| 26| | A| tcp| 4| +----------+-----+------+
Spark has a powerful SQL engine as well as a Machine Learning library. So now that we've loaded our Zeek data we're going to utilize the Spark SQL commands to do some investigation of our data including clustering from the MLLib.
# Plotting defaults
%matplotlib inline
import matplotlib.pyplot as plt
from zat.utils import plot_utils
plot_utils.plot_defaults()
# Add a column with the string length of the DNS query
from pyspark.sql.functions import col, length
# Create new dataframe that includes two new column
spark_df = spark_df.withColumn('query_length', length(col('query')))
spark_df = spark_df.withColumn('answer_length', length(col('answers')))
# Show histogram of the Spark DF request body lengths
bins, counts = spark_df.select('query_length').rdd.flatMap(lambda x: x).histogram(50)
# This is a bit awkward but I believe this is the correct way to do it
plt.hist(bins[:-1], bins=bins, weights=counts, log=True)
plt.grid(True)
plt.xlabel('DNS Query Lengths')
plt.ylabel('Counts')
Text(0, 0.5, 'Counts')
# Show histogram of the Spark DF request body lengths
bins, counts = spark_df.select('answer_length').rdd.flatMap(lambda x: x).histogram(50)
# This is a bit awkward but I believe this is the correct way to do it
plt.hist(bins[:-1], bins=bins, weights=counts, log=True)
plt.grid(True)
plt.xlabel('DNS Answer Lengths')
plt.ylabel('Counts')
Text(0, 0.5, 'Counts')
Note: This bit of cleanup code is no longer needed as the ZAT log_to_sparkdf now takes care of these things for us. :)
There are two bits of cleanup that we MUST do:
Note: Yes you can do backticks when selecting the column names BUT some of the pipeline operations below will FAIL internally if the column names have a '.' in them.
A Spark pipeline is a way to combine a sequence of complex algorithms and transformations to create a workflow. Once a pipeline is created Spark can optimize that pipeline when it's executed.
Below our pipeline consists of the following stages:
For more information on the details of Categorical Type to One Hot Encoding see our SCP Labs Encoding Dangers notebook.
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
cat_columns = ['qtype_name', 'proto']
num_columns = ['query_length', 'answer_length']
features = cat_columns + num_columns
stages = []
# String Indexer + One Hot Encoder (for categorical columns)
for cat_col in cat_columns:
string_indexer = StringIndexer(inputCol=cat_col, outputCol=cat_col + '_index')
encoder = OneHotEncoder(inputCol=cat_col + '_index', outputCol=cat_col + '_onehot')
stages += [string_indexer, encoder]
# Run StandardScaler on all the numerical features
num_vector = VectorAssembler(inputCols=num_columns, outputCol = 'num_features')
norm = StandardScaler(inputCol='num_features', outputCol='num_features_norm')
stages += [num_vector, norm]
# Assemble the categorical (one hot vectors) and numeric columns together
assembler_inputs = [c + "_onehot" for c in cat_columns] + ['num_features_norm']
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol='features')
stages += [assembler]
# Run the pipeline
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(spark_df)
spark_df = pipelineModel.transform(spark_df)
spark_df.select('features').show(truncate = False)
+------------------------------------------------------------------+ |features | +------------------------------------------------------------------+ |(16,[0,13,14,15],[1.0,1.0,2.280420188456751,0.07560960809619262]) | |(16,[0,13,14,15],[1.0,1.0,1.1858184979975104,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,1.4594689206123206,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,3.0101546487629114,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,1.3682521130740506,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,3.0101546487629114,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,1.4594689206123206,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,2.280420188456751,0.07560960809619262]) | |(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])| |(16,[0,13,14,15],[1.0,1.0,1.9155529583036708,0.07560960809619262])| |(16,[3,13,14,15],[1.0,1.0,2.462853803533291,0.07560960809619262]) | |(16,[2,13,14,15],[1.0,1.0,1.5506857281505906,0.07560960809619262])| +------------------------------------------------------------------+ only showing top 20 rows
from pyspark.ml.clustering import KMeans
# Train a k-means model.
kmeans = KMeans().setK(40)
model = kmeans.fit(spark_df)
# Evaluate clustering by computing Within Set Sum of Squared Errors.
wssse = model.computeCost(spark_df)
print("Within Set Sum of Squared Errors = " + str(wssse))
Within Set Sum of Squared Errors = 6885.514765318761
# Lets look at some of the clustering results
transformed = model.transform(spark_df).select(features + ['prediction'])
transformed.groupby(cat_columns + ['prediction']).count().sort('prediction').show(50)
+----------+-----+----------+-----+ |qtype_name|proto|prediction|count| +----------+-----+----------+-----+ | TXT| tcp| 0| 135| | TXT| udp| 0|11149| | PTR| udp| 1|11809| | AXFR| tcp| 2| 87| | AXFR| tcp| 3| 78| | A| udp| 4|28609| | PTR| udp| 5|40370| | PTR| tcp| 5| 25| | A| udp| 6|55667| | NB| udp| 7|11374| | NB| udp| 8|20059| | AAAA| udp| 9| 2| | -| udp| 9| 180| | -| tcp| 9| 5| | AAAA| udp| 10| 9241| | *| udp| 11| 144| | AAAA| udp| 11| 71| | SRV| udp| 11|10419| | NAPTR| udp| 12| 27| | A| udp| 12|25369| | MX| udp| 12| 163| | NB| udp| 13|15787| | AAAA| udp| 14| 6062| | AXFR| tcp| 15| 68| | -| udp| 15| 37| | PTR| udp| 15| 48| | *| udp| 16| 52| | A| udp| 16| 6059| | PTR| udp| 17| 100| | TXT| udp| 17| 12| | PTR| tcp| 17| 1| | AXFR| tcp| 18| 107| | SOA| udp| 19| 31| | NB| udp| 19| 1920| | MX| udp| 20| 6| | AXFR| tcp| 20| 24| | *| udp| 20| 652| | HINFO| udp| 20| 30| | AAAA| udp| 21|14022| | AAAA| udp| 22| 3764| | -| tcp| 23| 163| | -| udp| 23| 3255| | SRV| udp| 24| 727| | A| udp| 25| 614| | *| udp| 25| 31| | AAAA| udp| 25| 38| | A| udp| 26| 5429| | TXT| udp| 27| 1445| | A| udp| 28|13479| | A| udp| 29|50330| +----------+-----+----------+-----+ only showing top 50 rows
We can see that there's some natural grouping/clusters around the different qtype_names and protocpls but we also see that many of the query types/protocols are in several clusters... so lets take a closer look at the 'TXT' queries (Note: Replace 'TXT', with any other type and feel free to explore the other 'sub-clusters')
# Lets look at the 'TXT' query_name clusters
txt_queries = transformed.where(transformed['qtype_name'] == 'TXT').groupby(features + ['prediction']).\
count().sort('prediction')
txt_queries.show(50)
+----------+-----+------------+-------------+----------+-----+ |qtype_name|proto|query_length|answer_length|prediction|count| +----------+-----+------------+-------------+----------+-----+ | TXT| udp| 12| 1| 0| 488| | TXT| udp| 9| 1| 0| 21| | TXT| udp| 12| 12| 0| 1| | TXT| tcp| 12| 5| 0| 106| | TXT| udp| 23| 1| 0| 12| | TXT| udp| 22| 1| 0| 62| | TXT| udp| 14| 17| 0| 1| | TXT| tcp| 12| 1| 0| 24| | TXT| udp| 12| 5| 0| 214| | TXT| udp| 14| 1| 0|10305| | TXT| udp| 13| 1| 0| 31| | TXT| udp| 24| 1| 0| 1| | TXT| tcp| 12| 12| 0| 5| | TXT| udp| 13| 6| 0| 13| | TXT| udp| 29| 32| 17| 6| | TXT| udp| 36| 33| 17| 6| | TXT| udp| 36| 1| 27| 9| | TXT| udp| 33| 1| 27| 2| | TXT| udp| 36| 22| 27| 2| | TXT| udp| 40| 1| 27| 347| | TXT| udp| 39| 1| 27| 98| | TXT| udp| 41| 1| 27| 436| | TXT| udp| 35| 1| 27| 16| | TXT| udp| 42| 1| 27| 532| | TXT| udp| 34| 1| 27| 3| | TXT| udp| 72| 1| 34| 4| | TXT| udp| 64| 1| 34| 2| | TXT| udp| 67| 1| 34| 2| | TXT| udp| 82| 11| 34| 6| | TXT| udp| 83| 1| 34| 2| | TXT| udp| 12| 33| 37| 22| | TXT| tcp| 12| 33| 37| 91| +----------+-----+------------+-------------+----------+-----+
# The groupby/count produces a very small amount of data that
# we can easily pull down and plot from our local client
import pandas as pd
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# Convert to Pandas (make sure it's small)
txt_df = txt_queries.toPandas()
# Now use dataframe group by cluster
cluster_groups = txt_df.groupby('prediction')
# Plot the Machine Learning results
choices = ['red', 'green', 'blue', 'black', 'orange', 'purple', 'brown',
'pink', 'lightblue', 'grey', 'yellow']
colors = {value: choices[index] for index, value in enumerate(txt_df['prediction'].unique())}
fig, ax = plt.subplots()
for key, group in cluster_groups:
group.plot(ax=ax, kind='scatter', x='query_length', y='answer_length', alpha=0.5, s=250,
label='Cluster: {:d}'.format(key), color=colors[key])
So we gave the clustering algorithm both categorical types and numerical types and it seems to have done a reasonable job using both, we can see that the categorical types are clustered and then within the categorical clustering we have a set of 'sub-clusters' based on the numerical values.
Well that's it for this notebook, we pulled in Zeek log data from a Parquet file, then did some digging with high speed, parallel SQL operations and finally we clustered our data to organize the restuls.
If you liked this notebook please visit the ZAT project for more notebooks and examples.
The company was formed so that its developers could follow their passion for Python, streaming data pipelines and having fun with data analysis. We also think cows are cool and should be superheros or at least carry around rayguns and burner phones. Visit SuperCowPowers