from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('CF').getOrCreate()
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
# data source:
# https://github.com/yennanliu/movie_recommendation
# https://grouplens.org/datasets/movielens/
# https://github.com/khanhnamle1994/movielens
data = spark.read.csv("movie_ratings.csv", inferSchema=True, header=True)
data.printSchema()
root |-- userId: integer (nullable = true) |-- movieId: integer (nullable = true) |-- rating: double (nullable = true) |-- timestamp: integer (nullable = true)
data.show()
+------+-------+------+----------+ |userId|movieId|rating| timestamp| +------+-------+------+----------+ | 1| 31| 2.5|1260759144| | 1| 1029| 3.0|1260759179| | 1| 1061| 3.0|1260759182| | 1| 1129| 2.0|1260759185| | 1| 1172| 4.0|1260759205| | 1| 1263| 2.0|1260759151| | 1| 1287| 2.0|1260759187| | 1| 1293| 2.0|1260759148| | 1| 1339| 3.5|1260759125| | 1| 1343| 2.0|1260759131| | 1| 1371| 2.5|1260759135| | 1| 1405| 1.0|1260759203| | 1| 1953| 4.0|1260759191| | 1| 2105| 4.0|1260759139| | 1| 2150| 3.0|1260759194| | 1| 2193| 2.0|1260759198| | 1| 2294| 2.0|1260759108| | 1| 2455| 2.5|1260759113| | 1| 2968| 1.0|1260759200| | 1| 3671| 3.0|1260759117| +------+-------+------+----------+ only showing top 20 rows
data.describe().show()
+-------+------------------+------------------+------------------+--------------------+ |summary| userId| movieId| rating| timestamp| +-------+------------------+------------------+------------------+--------------------+ | count| 100004| 100004| 100004| 100004| | mean| 347.0113095476181|12548.664363425463| 3.543608255669773|1.1296390869392424E9| | stddev|195.16383797819535|26369.198968815268|1.0580641091070326|1.9168582602710962E8| | min| 1| 1| 0.5| 789652009| | max| 671| 163949| 5.0| 1476640644| +-------+------------------+------------------+------------------+--------------------+
# train, test split
training, test = data.randomSplit([0.8, 0.2])
# set up the model and super-parameters
als = ALS(maxIter=5,
regParam=0.01,
userCol='userId',
itemCol='movieId',
ratingCol= 'rating')
# train the model
model = als.fit(training)
# do the prediction
predicitons = model.transform(test)
# show the prediciton
predicitons.show()
+------+-------+------+----------+----------+ |userId|movieId|rating| timestamp|prediction| +------+-------+------+----------+----------+ | 452| 463| 2.0| 976424451| 2.4552588| | 85| 471| 3.0| 837512312| 3.9172719| | 588| 471| 3.0| 842298526| 4.7625732| | 460| 471| 5.0|1072836030| 3.8125675| | 274| 471| 5.0|1074104142| 3.6691563| | 292| 471| 3.5|1140049920| 4.0752306| | 15| 471| 3.0|1166586067| 2.311449| | 73| 471| 4.0|1296460183| 3.3499885| | 354| 471| 5.0| 846062674| 4.579715| | 529| 471| 4.0| 965497394| 3.1544423| | 184| 471| 5.0| 833525100| 4.5493975| | 311| 471| 0.5|1062015819| 2.6232295| | 521| 471| 3.5|1370072127| 4.019308| | 547| 496| 3.0| 974778561| 2.5938766| | 463| 1088| 3.0|1050499697| 3.0081568| | 52| 1088| 4.0|1231766626| 4.288722| | 500| 1088| 4.0|1229098924| 2.4964237| | 387| 1088| 4.0| 974790964| 2.1745355| | 514| 1088| 3.0| 853896732| 3.0606182| | 160| 1088| 4.0| 974258881| 4.6870093| +------+-------+------+----------+----------+ only showing top 20 rows
# evaluate the model (RegressionEvaluator : rmse)
evaluator = RegressionEvaluator(
metricName = 'rmse',
labelCol= 'rating',
predictionCol = 'prediction')
rmse = evaluator.evaluate(predicitons)
print ('RMSE')
print (rmse)
RMSE nan
test.show()
+------+-------+------+----------+ |userId|movieId|rating| timestamp| +------+-------+------+----------+ | 1| 1129| 2.0|1260759185| | 1| 1287| 2.0|1260759187| | 1| 1339| 3.5|1260759125| | 1| 1343| 2.0|1260759131| | 1| 2294| 2.0|1260759108| | 2| 10| 4.0| 835355493| | 2| 161| 3.0| 835355493| | 2| 186| 3.0| 835355664| | 2| 208| 3.0| 835355511| | 2| 292| 3.0| 835355492| | 2| 300| 3.0| 835355532| | 2| 339| 3.0| 835355492| | 2| 367| 3.0| 835355619| | 2| 457| 3.0| 835355511| | 2| 468| 4.0| 835355790| | 2| 474| 2.0| 835355828| | 2| 515| 4.0| 835355817| | 2| 550| 3.0| 835356109| | 2| 587| 3.0| 835355779| | 3| 736| 3.5|1298932787| +------+-------+------+----------+ only showing top 20 rows
# predict single user's taste
single_user = test.filter(test['userId']== 11).select(['userId', 'movieId'])
# movies a single users has watched
single_user.show()
+------+-------+ |userId|movieId| +------+-------+ | 11| 70| | 11| 1027| | 11| 1201| | 11| 1408| | 11| 2042| | 11| 3424| | 11| 71211| | 11| 77455| | 11| 81158| | 11| 81562| | 11| 96079| | 11| 96861| +------+-------+
recommendations = model.transform(single_user)
# recommendations output
recommendations.show()
+------+-------+----------+ |userId|movieId|prediction| +------+-------+----------+ | 11| 1201| 5.064004| | 11| 71211| 2.056238| | 11| 2042| 2.8747272| | 11| 96079| 5.072752| | 11| 81562| 5.3010736| | 11| 81158| 1.9231318| | 11| 96861| 1.9171791| | 11| 70| 3.744793| | 11| 1027| 4.1614656| | 11| 1408| 4.0286865| | 11| 3424| 3.1237607| | 11| 77455| 4.8629804| +------+-------+----------+
recommendations.orderBy('prediction', ascending=False).show()
+------+-------+----------+ |userId|movieId|prediction| +------+-------+----------+ | 11| 81562| 5.3010736| | 11| 96079| 5.072752| | 11| 1201| 5.064004| | 11| 77455| 4.8629804| | 11| 1027| 4.1614656| | 11| 1408| 4.0286865| | 11| 70| 3.744793| | 11| 3424| 3.1237607| | 11| 2042| 2.8747272| | 11| 71211| 2.056238| | 11| 81158| 1.9231318| | 11| 96861| 1.9171791| +------+-------+----------+
# end of course (CF) : 15.56
# next : 16 : NLP