In this notebook will show how easy it is to load up really big Zeek/Zeek logs by using the classes within the Zeek Analysis Tools. We'll also show converting a Zeek log into a Parquet file in one line of code.
# Third Party Imports
import pyspark
from pyspark.sql import SparkSession
# Local imports
import zat
from zat import log_to_sparkdf
# Good to print out versions of stuff
print('ZAT: {:s}'.format(zat.__version__))
print('PySpark: {:s}'.format(pyspark.__version__))
ZAT: 0.3.7 PySpark: 2.4.4
Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:
# Spin up a local Spark Session (with 4 executors)
spark = SparkSession.builder.master('local[4]').appName('my_awesome').getOrCreate()
# Use the ZAT class to load our log file into a Spark dataframe (2 lines of code!)
spark_it = log_to_sparkdf.LogToSparkDF(spark)
spark_df = spark_it.create_dataframe('/Users/briford/data/bro/http.log')
Spark will read in and partition the data out to our workers. Our dataframe(rdd) will have some number of partitions that are divided up amongst the worker pool. Each worker will operate on only a subset of the data and Spark will manage the 'magic' for how that work gets run, aggregated and presented.
Image Credit: Jacek Laskowski, please see his excellent book - Mastering Apache Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark
spark_df.rdd.getNumPartitions()
11
Here we're going to demonstrate just a few simple Spark operations but obviously you now have the full power of the Death Star in your hands.
# Get information about the Spark DataFrame
num_rows = spark_df.count()
print("Number of Rows: {:d}".format(num_rows))
columns = spark_df.columns
print("Columns: {:s}".format(','.join(columns)))
Number of Rows: 2048442 Columns: ts,uid,id_orig_h,id_orig_p,id_resp_h,id_resp_p,trans_depth,method,host,uri,referrer,user_agent,request_body_len,response_body_len,status_code,status_msg,info_code,info_msg,filename,tags,username,password,proxied,orig_fuids,orig_mime_types,resp_fuids,resp_mime_types
spark_df.select(['id_orig_h', 'host', 'uri', 'status_code', 'user_agent']).show(5)
+--------------+---------------+--------------+-----------+--------------------+ | id_orig_h| host| uri|status_code| user_agent| +--------------+---------------+--------------+-----------+--------------------+ |192.168.202.79|192.168.229.251|/DEASLog02.nsf| 404|Mozilla/5.0 (comp...| |192.168.202.79|192.168.229.251|/DEASLog03.nsf| 404|Mozilla/5.0 (comp...| |192.168.202.79|192.168.229.251|/DEASLog04.nsf| 404|Mozilla/5.0 (comp...| |192.168.202.79|192.168.229.251|/DEASLog05.nsf| 404|Mozilla/5.0 (comp...| |192.168.202.79|192.168.229.251| /DEASLog.nsf| 404|Mozilla/5.0 (comp...| +--------------+---------------+--------------+-----------+--------------------+ only showing top 5 rows
spark_df.groupby('method','status_code').count().sort('count', ascending=False).show()
+-------+-----------+-------+ | method|status_code| count| +-------+-----------+-------+ | HEAD| 404|1294022| | GET| 404| 429283| | POST| 200| 125638| | GET| 200| 88631| | POST| 0| 32918| | GET| 400| 29152| | GET| 303| 10858| | GET| 403| 8530| | POST| 404| 4277| | GET| 304| 3851| | GET| 302| 3250| | GET| 0| 2906| | GET| 401| 2159| |OPTIONS| 200| 1897| | POST| 302| 1226| | HEAD| 503| 1010| | POST| 206| 869| | GET| 301| 642| | HEAD| 0| 606| | GET| 503| 550| +-------+-----------+-------+ only showing top 20 rows
Apache Parquet is a columnar storage format focused on performance. Parquet data is often used within the Hadoop ecosystem and converting your Zeek/Zeek log to a Parquet file is one line of code!
# DataFrames can be saved as Parquet files, maintaining the schema information.
spark_df.write.parquet('http.parquet')
# Have Spark read in the Parquet File
spark_df = spark.read.parquet("http.parquet")
The query below was executed on 4 workers. The data contains over 2 million HTTP requests/responses and the time to complete was a fraction of a second running on my Mac Laptop :)
%time spark_df.groupby('method','status_code').count().sort('count', ascending=False).show()
+-------+-----------+-------+ | method|status_code| count| +-------+-----------+-------+ | HEAD| 404|1294022| | GET| 404| 429283| | POST| 200| 125638| | GET| 200| 88631| | POST| 0| 32918| | GET| 400| 29152| | GET| 303| 10858| | GET| 403| 8530| | POST| 404| 4277| | GET| 304| 3851| | GET| 302| 3250| | GET| 0| 2906| | GET| 401| 2159| |OPTIONS| 200| 1897| | POST| 302| 1226| | HEAD| 503| 1010| | POST| 206| 869| | GET| 301| 642| | HEAD| 0| 606| | GET| 503| 550| +-------+-----------+-------+ only showing top 20 rows CPU times: user 3.05 ms, sys: 1.41 ms, total: 4.46 ms Wall time: 369 ms
Spark has a powerful SQL engine as well as a Machine Learning library. So now that we've got the data loaded into a Spark Dataframe we're going to utilize Spark SQL commands to do some investigation and clustering using the Spark MLLib. For this deeper dive we're going to go to another notebook :)
Well that's it for this notebook, we went from a Zeek log to a high performance Parquet file and then did some digging with high speed, parallel SQL and groupby operations.
If you liked this notebook please visit the ZAT project for more notebooks and examples.
The company was formed so that its developers could follow their passion for Python, streaming data pipelines and having fun with data analysis. We also think cows are cool and should be superheros or at least carry around rayguns and burner phones. Visit SuperCowPowers