The benefit of working with PySpark and Jupyter is that we don't need to alter our normal way of operating with data.
PySparks Dataframes provide a lot of similar functionality to pandas dataframes, and we are able to easily interweave our standard python.
We're able to directly read in data from places like S3, using a similar notation as what we used in Hive and what we used with pandas.
input_bucket = 's3://compdbms-spring-2021-jk'
input_path = '/ratebeer/*.json'
# This creates a dataframe from the provided bucket
df = spark.read.json(input_bucket + input_path)
df.show()
To get a better view of the structure or schema of our data, we can diretly print out the information:
df.printSchema()
df.columns
Unlike pandas, our process for identifying columns for processesing are a bit different. Typically we will select the columns we want to work with.
Notice how this actually runs a job, as the value is being requested
df.select('beer/beerId').distinct().count()
We have a number of columns that are effectively ordinal (style, appearance, etc.), but are housed in strings formatted as x/y ratings. We could work with these thorugh SQL, but creating those fields in Spark gives us access to more powerful computations (like ML).
Notice: Whie this is asking a lot to happen, it returns immediately as no values are actually requested
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql import functions as F
cols = ["review/appearance", "review/aroma", "review/overall", "review/palate", "review/taste",
"beer/ABV", "beer/beerId", "beer/brewerId", "review/time"]
for col in cols:
get_int = F.split(df[col], '/').getItem(0).cast(DoubleType()) #function to translate string->int
print(col)
col_name = col.split('/')[1]
df = df.withColumn(col_name, get_int)
df.columns
df.select('aroma').show(10)
While there are a number of ways to perform machine learning within spark, one of the easiest is to hook into sparks built in ml library. We'll walk through a simple linear regression example to show how this works.
inputs_outputs = ["appearance", "aroma", "overall", "palate", "taste", "ABV", "time"]
features = ["appearance", "aroma", "palate", "taste", "ABV", "time"]
label = "overall"
First lets make sure there are no nulls
from pyspark.sql.functions import col, count, isnan, when
df.select([count(when(col(c).isNull(), c)).alias(c) for c in features]).show()
Given that there are nulls, we'll need to clean things up before setting up our dataset.
lr_df = df.select(inputs_outputs).dropna(how='any')
feats = df.select(features)
Second we'll assemble our data into a format that works more easily with PySpark
from pyspark.ml.feature import VectorAssembler
#let's assemble our features together using vectorAssembler
assembler = VectorAssembler(
inputCols=feats.columns,
outputCol="features")
output = assembler.transform(lr_df).select('features','overall')
train,test = output.randomSplit([0.75, 0.25])
train.show(10)
Next we'll need to configure our model
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8,
featuresCol='features', labelCol='overall')
# Fit the model
linear_model = lr.fit(train)
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(linear_model.coefficients))
print("Intercept: %s" % str(linear_model.intercept))
# Summarize the model over the training set and print out some metrics
train_summary = linear_model.summary
print("numIterations: %d" % train_summary.totalIterations)
print("objectiveHistory: %s" % str(train_summary.objectiveHistory))
train_summary.residuals.show()
print("RMSE: %f" % train_summary.rootMeanSquaredError)
print("r2: %f" % train_summary.r2)
With a trained model we can evaluate it's performance against our holdout dataset by calling a transform on the *test* data. From there we can run a regression evaluation using a RegressionEvaluator object
Note: The reason for all of these weird objects for spark, is that many processes need to be rewritten for Spark to take full advantage of the programming paradigm
from pyspark.ml.evaluation import RegressionEvaluator
predictions = linear_model.transform(test)
pred_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="overall",metricName="r2")
print("R Squared (R2) on test data = %g" % pred_evaluator.evaluate(predictions))