Import Packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, countDistinct, count, when, sum,col
from pyspark.sql.types import IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import OneHotEncoder, StringIndexer, MinMaxScaler, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
import warnings
warnings.filterwarnings('ignore')
Load data from AWS
# Create spark session
spark = (SparkSession
.builder
.appName("Sparkify")
.getOrCreate())
# Read in full sparkify dataset
event_data = "s3n://dsnd-sparkify/sparkify_event_data.json"
events = spark.read.json(event_data)
VBox()
We will define Churn as Cancellation Confirmation
events. We could also add Downgrade
events as Churn, but we could use Downgrade
events as an additional feature to predict Cancellation Confirmation
events (Churn).
Create a column named Churn
as the label of whether the user has churned
Build 7 features that are needed to construct the model
Remove several less useful columns to speed up the opreations
events = events.drop('firstName', 'lastName', 'auth', 'gender', 'song','artist',
'status', 'method', 'location', 'registration', 'itemInSession')
VBox()
1. pivot the page column to obtain different activities for the user, then remove the less significant features
events_pivot = events.groupby(["userId"]).pivot("page").count().fillna(0)
# drop unecessary columns
events_pivot = events_pivot.drop('About', 'Cancel', 'Login', 'Submit Registration', 'Register', 'Save Settings')
VBox()
2. Add average song played length
# filter events log to contain only next song
events_songs = events.filter(events.page == 'NextSong')
# Total songs length played
total_length = events_songs.groupby(events_songs.userId).agg(sum('length'))
# join events pivot
events_pivot = (events_pivot.join(total_length, on = 'userId', how = 'left')
.withColumnRenamed("Cancellation Confirmation", "Churn")
.withColumnRenamed("sum(length)", "total_length"))
VBox()
3. Add days active
convert = 1000*60*60*24 # conversion factor to days
# Find minimum/maximum time stamp of each user
min_timestmp = events.select(["userId", "ts"]).groupby("userId").min("ts")
max_timestmp = events.select(["userId", "ts"]).groupby("userId").max("ts")
# Find days active of each user
daysActive = min_timestmp.join(max_timestmp, on="userId")
daysActive = (daysActive.withColumn("days_active",
(col("max(ts)")-col("min(ts)")) / convert))
daysActive = daysActive.select(["userId", "days_active"])
# join events pivot
events_pivot = events_pivot.join(daysActive, on = 'userId', how = 'left')
VBox()
4. Add number of sessions
numSessions = (events.select(["userId", "sessionId"])
.distinct()
.groupby("userId")
.count()
.withColumnRenamed("count", "num_sessions"))
# join events pivot
events_pivot = events_pivot.join(numSessions, on = 'userId', how = 'left')
VBox()
5. Add days as paid user
# Find minimum/maximum time stamp of each user as paid user
paid_min_ts = events.filter(events.level == 'paid').groupby("userId").min("ts")
paid_max_ts = events.filter(events.level == 'paid').groupby("userId").max("ts")
# Find days as paid user of each user
daysPaid = paid_min_ts.join(paid_max_ts, on="userId")
daysPaid = (daysPaid.withColumn("days_paid",
(col("max(ts)")-col("min(ts)")) / convert))
daysPaid = daysPaid.select(["userId", "days_paid"])
# join events pivot
events_pivot = events_pivot.join(daysPaid, on = 'userId', how='left')
VBox()
6. Add days as a free user
# Find minimum/maximum time stamp of each user as paid user
free_min_ts = events.filter(events.level == 'free').groupby("userId").min("ts")
free_max_ts = events.filter(events.level == 'free').groupby("userId").max("ts")
# Find days as paid user of each user
daysFree = free_min_ts.join(free_max_ts, on="userId")
daysFree = (daysFree.withColumn("days_free",
(col("max(ts)")-col("min(ts)")) / convert))
daysFree = daysFree.select(["userId", "days_free"])
# join events pivot
events_pivot = events_pivot.join(daysFree, on = 'userId', how='left')
VBox()
7. Add user access agent
# find user access agents, and perform one-hot encoding on the user
userAgents = events.select(['userId', 'userAgent']).distinct()
userAgents = userAgents.fillna('Unknown')
# build string indexer
stringIndexer = StringIndexer(inputCol="userAgent", outputCol="userAgentIndex")
model = stringIndexer.fit(userAgents)
userAgents = model.transform(userAgents)
# one hot encode userAgent column
encoder = OneHotEncoder(inputCol="userAgentIndex", outputCol="userAgentVec")
userAgents = encoder.transform(userAgents).select(['userId', 'userAgentVec'])
# join events pivot
events_pivot = events_pivot.join(userAgents, on = 'userId', how ='left')
VBox()
8. Fill all empty values as 0
events_pivot = events_pivot.fillna(0)
VBox()
Split the full dataset into train, test, and validation sets. Test out three machine learning algorithms
Gradient Boosting has the largest out-of-bag F1-score, we will proceed with this algorithm and build a pipeline around this algorithm.
# Split data into train and test set
events_pivot = events_pivot.withColumnRenamed('Churn', 'label')
training, test = events_pivot.randomSplit([0.9, 0.1])
VBox()
Build machine learning pipeline
# Create vector from feature data
feature_names = events_pivot.drop('label', 'userId').schema.names
vec_asembler = VectorAssembler(inputCols = feature_names, outputCol = "Features")
# Scale each column
scalar = MinMaxScaler(inputCol="Features", outputCol="ScaledFeatures")
# build classifier
gbt = GBTClassifier(featuresCol="ScaledFeatures", labelCol="label")
# Consturct pipeline
pipeline_gbt = Pipeline(stages=[vec_asembler, scalar, gbt])
VBox()
Fit gradient boosting model
gbt_model = pipeline_gbt.fit(training)
VBox()
def modelEvaluations(model, metric, data):
""" Evaluate a machine learning model's performance
Input:
model - pipeline object
metric - the metric of the evaluations
data - data being evaluated
Output:
[score, confusion matrix]
"""
# generate predictions
evaluator = MulticlassClassificationEvaluator(metricName = metric)
predictions = model.transform(data)
# calcualte score
score = evaluator.evaluate(predictions)
confusion_matrix = (predictions.groupby("label")
.pivot("prediction")
.count())
return [score, confusion_matrix]
VBox()
f1_best, conf_mtx_best = modelEvaluations(gbt_model, 'f1', test)
VBox()
print('The F1 score for the gradient boosting model:', f1_best)
conf_mtx_best.show()
VBox()
('The F1 score for the gradient boosting model:', 0.8896163691822966) +-----+----+---+ |label| 0.0|1.0| +-----+----+---+ | 0|1612| 70| | 1| 163|344| +-----+----+---+