Bro to Parquet to Spark

Apache Parquet is a columnar storage format focused on performance. Parquet data is often used within the Hadoop ecosystem and we will specifically be using it for loading data into both Pandas and Spark.



Bleeding Edge Warning:

You know you're on the bleeding edge when you link PRs that are still open/in-progess. There are two open issues with saving Parquet Files right now.

For Spark timestamps, the BAT Parquet writer used below will output INT96 timestamps for now (we'll change over later when ARROW-1499 is complete).

For the TimeDelta support we'll just have to wait until that gets pushed into the main branch and released.

In [1]:
# Third Party Imports
import pyspark
from pyspark.sql import SparkSession
import pyarrow

# Local imports
import bat
from bat.log_to_parquet import log_to_parquet

# Good to print out versions of stuff
print('BAT: {:s}'.format(bat.__version__))
print('PySpark: {:s}'.format(pyspark.__version__))
print('PyArrow: {:s}'.format(pyarrow.__version__))
BAT: 0.2.7
PySpark: 2.2.0
PyArrow: 0.6.0

Bro log to Parquet File

Here we're loading in a Bro HTTP log with ~2 million rows to demonstrate the functionality and do some simple spark processing on the data.

  • log_to_parquet is iterative so it can handle large files
  • 'row_group_size' defaults to 1 Million rows but can be set manually
In [2]:
# Create a Parquet file from a Bro Log with a super nice BAT method.
log_to_parquet('/Users/briford/data/bro/sec_repo/http.log', 'http.parquet')
Successfully monitoring /Users/briford/data/bro/sec_repo/http.log...
Writing 0 rows...
Writing 1000000 rows...
Writing 2000000 rows...
Writing 2048441 rows...
Parquet File Complete

Parquet files are compressed

Here we see the first benefit of Parquet which stores data with compressed columnar format. There are several compression options available (including uncompressed).

Original http.log = 1.3 GB

http.parquet = 106 MB

Spark It!

Spin up Spark with 4 Parallel Executors

Here we're spinning up a local spark server with 4 parallel executors, although this might seem a bit silly since we're probably running this on a laptop, there are a couple of important observations:

  • If you have 4/8 cores use them!
  • It's the exact same code logic as if we were running on a distributed cluster.
  • We run the same code on DataBricks ( which is awesome BTW.
In [2]:
# Spin up a local Spark Session (with 4 executors)
spark = SparkSession.builder.master('local[4]').appName('my_awesome').getOrCreate()

Parquet files are fast

We see from the below timer output that the Parquet file only takes a few seconds to read into Spark.

In [3]:
# Have Spark read in the Parquet File
%time spark_df ="http.parquet")
CPU times: user 2.74 ms, sys: 1.36 ms, total: 4.1 ms
Wall time: 2.07 s

Parquet files are Parallel

We see that, in this case, the number of data partitions in our dataframe(rdd) equals the number of executors/workers. If we had 8 workers there would be 8 partitions (at least, often there are more partitions based on how big the data is and how the files were writen, etc).

Image Credit: Jacek Laskowski, please see his excellent book - Mastering Apache Spark

In [4]:

Lets look at our data

We should always inspect out data when it comes in. Look at both the data values and the data types to make sure you're getting exactly what you should be.

In [11]:
# Get information about the Spark DataFrame
num_rows = spark_df.count()
print("Number of Rows: {:d}".format(num_rows))
columns = spark_df.columns
print("Columns: {:s}".format(','.join(columns)))
Number of Rows: 2048442
Columns: filename,host,id.orig_h,id.orig_p,id.resp_h,id.resp_p,info_code,info_msg,method,orig_fuids,orig_mime_types,password,proxied,referrer,request_body_len,resp_fuids,resp_mime_types,response_body_len,status_code,status_msg,tags,trans_depth,uid,uri,user_agent,username,ts
In [6]:['`id.orig_h`', 'host', 'uri', 'status_code', 'user_agent']).show(5)
|     id.orig_h|           host|           uri|status_code|          user_agent|
|||/DEASLog02.nsf|        404|Mozilla/5.0 (comp...|
|||/DEASLog03.nsf|        404|Mozilla/5.0 (comp...|
|||/DEASLog04.nsf|        404|Mozilla/5.0 (comp...|
|||/DEASLog05.nsf|        404|Mozilla/5.0 (comp...|
|||  /DEASLog.nsf|        404|Mozilla/5.0 (comp...|
only showing top 5 rows

Did we mention fast?

The query below was executed on 4 workers. The data contains over 2 million HTTP requests/responses and the time to complete was less than 1 second. All this code is running on a 2016 Mac Laptop :)

In [8]:
%time spark_df.groupby('method','status_code').count().sort('count', ascending=False).show()
| method|status_code|  count|
|   HEAD|        404|1294022|
|    GET|        404| 429361|
|   POST|        200| 125638|
|    GET|        200|  88636|
|   POST|          0|  32918|
|    GET|        400|  29152|
|    GET|        303|  10858|
|    GET|        403|   8530|
|   POST|        404|   4277|
|    GET|        304|   3851|
|    GET|        302|   3250|
|    GET|          0|   2823|
|    GET|        401|   2159|
|OPTIONS|        200|   1897|
|   POST|        302|   1226|
|   HEAD|        503|   1010|
|   POST|        206|    869|
|    GET|        301|    642|
|   HEAD|          0|    606|
|    GET|        503|    550|
only showing top 20 rows

CPU times: user 4.01 ms, sys: 1.53 ms, total: 5.54 ms
Wall time: 848 ms

Data looks good, lets take a deeper dive

Spark has a powerful SQL engine as well as a Machine Learning library. So now that we've got the data loaded into Parquet we're going to utilize the Spark SQL commands to do some investigation and clustering using the Spark MLLib. For this deeper dive we're going to go to another notebook :)

Spark Clustering Notebook

Wrap Up

Well that's it for this notebook, we went from a Bro log to a high performance Parquet file and then did some digging with high speed, parallel SQL and groupby operations.

If you liked this notebook please visit the BAT project for more notebooks and examples.