# Section 11 : Linear Regression
# course 11.35
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
spark = SparkSession.builder.appName('linear_regression').getOrCreate()
# load the data
# https://github.com/yennanliu/analysis/blob/master/SPARK_/sample_linear_regression_data.txt
training = spark.read.format('libsvm').load('sample_linear_regression_data.txt')
# take a look on the original data
!head -5 sample_linear_regression_data.txt
-9.490009878824548 1:0.4551273600657362 2:0.36644694351969087 3:-0.38256108933468047 4:-0.4458430198517267 5:0.33109790358914726 6:0.8067445293443565 7:-0.2624341731773887 8:-0.44850386111659524 9:-0.07269284838169332 10:0.5658035575800715 0.2577820163584905 1:0.8386555657374337 2:-0.1270180511534269 3:0.499812362510895 4:-0.22686625128130267 5:-0.6452430441812433 6:0.18869982177936828 7:-0.5804648622673358 8:0.651931743775642 9:-0.6555641246242951 10:0.17485476357259122 -4.438869807456516 1:0.5025608135349202 2:0.14208069682973434 3:0.16004976900412138 4:0.505019897181302 5:-0.9371635223468384 6:-0.2841601610457427 7:0.6355938616712786 8:-0.1646249064941625 9:0.9480713629917628 10:0.42681251564645817 -19.782762789614537 1:-0.0388509668871313 2:-0.4166870051763918 3:0.8997202693189332 4:0.6409836467726933 5:0.273289095712564 6:-0.26175701211620517 7:-0.2794902492677298 8:-0.1306778297187794 9:-0.08536581111046115 10:-0.05462315824828923 -7.966593841555266 1:-0.06195495876886281 2:0.6546448480299902 3:-0.6979368909424835 4:0.6677324708883314 5:-0.07938725467767771 6:-0.43885601665437957 7:-0.608071585153688 8:-0.6414531182501653 9:0.7313735926547045 10:-0.026818676347611925
training
DataFrame[label: double, features: vector]
# HERE YOU CAN SEE THE DATASET IS WITH "LABEL" AND "FEATURES" COLUMNS
# WHICH IS THE DEFAULT SCHEMA THAT SPARK ML-LIB CAN RUN MODELING WITH
##### label: double, features: vector #####
training.show()
+-------------------+--------------------+ | label| features| +-------------------+--------------------+ | -9.490009878824548|(10,[0,1,2,3,4,5,...| | 0.2577820163584905|(10,[0,1,2,3,4,5,...| | -4.438869807456516|(10,[0,1,2,3,4,5,...| |-19.782762789614537|(10,[0,1,2,3,4,5,...| | -7.966593841555266|(10,[0,1,2,3,4,5,...| | -7.896274316726144|(10,[0,1,2,3,4,5,...| | -8.464803554195287|(10,[0,1,2,3,4,5,...| | 2.1214592666251364|(10,[0,1,2,3,4,5,...| | 1.0720117616524107|(10,[0,1,2,3,4,5,...| |-13.772441561702871|(10,[0,1,2,3,4,5,...| | -5.082010756207233|(10,[0,1,2,3,4,5,...| | 7.887786536531237|(10,[0,1,2,3,4,5,...| | 14.323146365332388|(10,[0,1,2,3,4,5,...| |-20.057482615789212|(10,[0,1,2,3,4,5,...| |-0.8995693247765151|(10,[0,1,2,3,4,5,...| | -19.16829262296376|(10,[0,1,2,3,4,5,...| | 5.601801561245534|(10,[0,1,2,3,4,5,...| |-3.2256352187273354|(10,[0,1,2,3,4,5,...| | 1.5299675726687754|(10,[0,1,2,3,4,5,...| | -0.250102447941961|(10,[0,1,2,3,4,5,...| +-------------------+--------------------+ only showing top 20 rows
# create the model
lr = LinearRegression(featuresCol='features',
labelCol= 'label',
predictionCol = 'prediciton')
# train the model
lrModel = lr.fit(training)
# print output
print ('coefficients : ', lrModel.coefficients)
print ('intercept : ', lrModel.intercept)
coefficients : [0.0073350710225801715,0.8313757584337543,-0.8095307954684084,2.441191686884721,0.5191713795290003,1.1534591903547016,-0.2989124112808717,-0.5128514186201779,-0.619712827067017,0.6956151804322931] intercept : 0.14228558260358093
# use train cummary
training_summary = lrModel.summary
print ('r2 : ', training_summary.r2)
print ('rootMeanSquaredError : ', training_summary.rootMeanSquaredError)
r2 : 0.027839179518600154 rootMeanSquaredError : 10.16309157133015
all_data = spark.read.format('libsvm').load('sample_linear_regression_data.txt')
# random split
train_data, test_data = all_data.randomSplit([.7,.3])
print ( 'all data count : ', all_data.count())
print ( 'train data count : ', train_data.count())
print ( 'test data count : ', test_data.count())
all data count : 501 train data count : 365 test data count : 136
# train again only on train data
correct_model = lr.fit(training)
# test on test data
test_results = correct_model.evaluate(test_data)
# print the results (test data)
# use train cummary
print ('r2 : ', test_results.r2)
print ('rootMeanSquaredError : ', test_results.rootMeanSquaredError)
r2 : 0.08455715143231024 rootMeanSquaredError : 9.691326744797616
unlabeled_data = test_data.select('features')
# have some test data to run the followign prdiciton
unlabeled_data.show()
+--------------------+ | features| +--------------------+ |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| |(10,[0,1,2,3,4,5,...| +--------------------+ only showing top 20 rows
# MAKE PREDICTION
# NOTE : IN SPARK ML-LIB, PREDICT IS RUN BY "transform" COMMAND
predictions = correct_model.transform(unlabeled_data)
# show prediction
predictions.show()
+--------------------+--------------------+ | features| prediciton| +--------------------+--------------------+ |(10,[0,1,2,3,4,5,...| -3.5124943764463135| |(10,[0,1,2,3,4,5,...| -3.147868811718382| |(10,[0,1,2,3,4,5,...| -2.499423280435292| |(10,[0,1,2,3,4,5,...| 1.7010353768556734| |(10,[0,1,2,3,4,5,...| -0.5388564818088987| |(10,[0,1,2,3,4,5,...| -1.475284763550391| |(10,[0,1,2,3,4,5,...| -0.7489108841213971| |(10,[0,1,2,3,4,5,...| -2.508322852836744| |(10,[0,1,2,3,4,5,...| -0.976510689078842| |(10,[0,1,2,3,4,5,...| -0.9566138722165072| |(10,[0,1,2,3,4,5,...| 3.7236186142728274| |(10,[0,1,2,3,4,5,...| 1.2421598960943985| |(10,[0,1,2,3,4,5,...| -0.7195663865895121| |(10,[0,1,2,3,4,5,...| -1.780965034607929| |(10,[0,1,2,3,4,5,...|-0.06740884917840151| |(10,[0,1,2,3,4,5,...| 2.746996971787099| |(10,[0,1,2,3,4,5,...| 0.5789191740943999| |(10,[0,1,2,3,4,5,...| -1.2048075065353916| |(10,[0,1,2,3,4,5,...| -0.6964026254414395| |(10,[0,1,2,3,4,5,...| -3.0756131143558623| +--------------------+--------------------+ only showing top 20 rows
# end of 11.35
# next : 11.36
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
data = spark.read.csv('boston.csv', inferSchema=True, header=True)
data.columns
['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT', 'price']
data.printSchema()
root |-- CRIM: double (nullable = true) |-- ZN: double (nullable = true) |-- INDUS: double (nullable = true) |-- CHAS: double (nullable = true) |-- NOX: double (nullable = true) |-- RM: double (nullable = true) |-- AGE: double (nullable = true) |-- DIS: double (nullable = true) |-- RAD: double (nullable = true) |-- TAX: double (nullable = true) |-- PTRATIO: double (nullable = true) |-- B: double (nullable = true) |-- LSTAT: double (nullable = true) |-- price: double (nullable = true)
data.show(2)
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+-----+-----+-----+ | CRIM| ZN|INDUS|CHAS| NOX| RM| AGE| DIS|RAD| TAX|PTRATIO| B|LSTAT|price| +-------+----+-----+----+-----+-----+----+------+---+-----+-------+-----+-----+-----+ |0.00632|18.0| 2.31| 0.0|0.538|6.575|65.2| 4.09|1.0|296.0| 15.3|396.9| 4.98| 24.0| |0.02731| 0.0| 7.07| 0.0|0.469|6.421|78.9|4.9671|2.0|242.0| 17.8|396.9| 9.14| 21.6| +-------+----+-----+----+-----+-----+----+------+---+-----+-------+-----+-----+-----+ only showing top 2 rows
#################################################################################
#
# -- transform csv to feature to be access by SPARK MLIB --
# all numerical cols as feature (except price), set price as target to predict
#
#
#
#
#
#################################################################################
input_cols_ = ['CRIM',
'ZN',
'INDUS',
'CHAS',
'NOX',
'RM',
'AGE',
'DIS',
'RAD',
'TAX',
'PTRATIO',
'B',
'LSTAT']
# -------------------------------------------------------------------#
# transform input_cols_ --> feature (for following ML using) #
assembler = VectorAssembler(inputCols= input_cols_, outputCol = 'features')
# -------------------------------------------------------------------#
# run the transformation
output = assembler.transform(data)
# show the transformed feature col
output.select('features').show()
+--------------------+ | features| +--------------------+ |[0.00632,18.0,2.3...| |[0.02731,0.0,7.07...| |[0.02729,0.0,7.07...| |[0.03237,0.0,2.18...| |[0.06905,0.0,2.18...| |[0.02985,0.0,2.18...| |[0.08829,12.5,7.8...| |[0.14455,12.5,7.8...| |[0.21124,12.5,7.8...| |[0.17004,12.5,7.8...| |[0.22489,12.5,7.8...| |[0.11747,12.5,7.8...| |[0.09378,12.5,7.8...| |[0.62976,0.0,8.14...| |[0.63796,0.0,8.14...| |[0.62739,0.0,8.14...| |[1.05393,0.0,8.14...| |[0.7842,0.0,8.14,...| |[0.80271,0.0,8.14...| |[0.7258,0.0,8.14,...| +--------------------+ only showing top 20 rows
# create the final data for training ( feature as input, price as prediction output)
final_data = output.select('features', 'price')
final_data.show()
+--------------------+-----+ | features|price| +--------------------+-----+ |[0.00632,18.0,2.3...| 24.0| |[0.02731,0.0,7.07...| 21.6| |[0.02729,0.0,7.07...| 34.7| |[0.03237,0.0,2.18...| 33.4| |[0.06905,0.0,2.18...| 36.2| |[0.02985,0.0,2.18...| 28.7| |[0.08829,12.5,7.8...| 22.9| |[0.14455,12.5,7.8...| 27.1| |[0.21124,12.5,7.8...| 16.5| |[0.17004,12.5,7.8...| 18.9| |[0.22489,12.5,7.8...| 15.0| |[0.11747,12.5,7.8...| 18.9| |[0.09378,12.5,7.8...| 21.7| |[0.62976,0.0,8.14...| 20.4| |[0.63796,0.0,8.14...| 18.2| |[0.62739,0.0,8.14...| 19.9| |[1.05393,0.0,8.14...| 23.1| |[0.7842,0.0,8.14,...| 17.5| |[0.80271,0.0,8.14...| 20.2| |[0.7258,0.0,8.14,...| 18.2| +--------------------+-----+ only showing top 20 rows
# train, test split
train_data, test_data = final_data.randomSplit([0.7, 0.3])
print ( 'all data count : ', final_data.count())
print ( 'train data count : ', train_data.count())
print ( 'test data count : ', test_data.count())
all data count : 506 train data count : 345 test data count : 161
train_data.describe().show()
+-------+------------------+ |summary| price| +-------+------------------+ | count| 345| | mean|22.602898550724642| | stddev| 9.311339220842957| | min| 5.0| | max| 50.0| +-------+------------------+
# now, training with linear regression again
lr = LinearRegression(labelCol='price')
lr_model = lr.fit(train_data)
test_result = lr_model.evaluate(test_data)
# print the test data fitting results
test_result.residuals.show()
+--------------------+ | residuals| +--------------------+ | 2.820847914698863| | 10.092829367610015| | 7.383702839871042| | 5.77379335027215| | 1.439135362533321| | 6.360009488793956| | -5.894733583181768| | -3.771279882265347| |-0.10009484396961454| | 5.50501225227282| | 1.1319632754860187| | -5.434888522994932| | 7.662804510616631| |-0.03649746518004804| | -6.7716019362626625| | -3.257293842284902| | -0.5461996288341915| | -1.0521216424280162| | -3.452964004804734| | -11.084224152768945| +--------------------+ only showing top 20 rows
# print the test data fitting results
print ('* r2 : ', test_result.r2)
print ('* rootMeanSquaredError : ', test_result.rootMeanSquaredError)
print ('* coefficients : ', lrModel.coefficients)
print ('* intercept : ', lrModel.intercept)
* r2 : 0.7225741051946737 * rootMeanSquaredError : 4.712121102879971 * coefficients : [0.0073350710225801715,0.8313757584337543,-0.8095307954684084,2.441191686884721,0.5191713795290003,1.1534591903547016,-0.2989124112808717,-0.5128514186201779,-0.619712827067017,0.6956151804322931] * intercept : 0.14228558260358093
# compare with the input data
# 1) rootMeanSquaredError : 4.482524071400767 VS mean : 22.53280632411069
# 2) r2 : 0.7534954131369154 --> means the model only covers ~ 75% of the variant of the data, which is not a very good model
final_data.describe().show()
+-------+------------------+ |summary| price| +-------+------------------+ | count| 506| | mean|22.532806324110698| | stddev| 9.197104087379815| | min| 5.0| | max| 50.0| +-------+------------------+
# run the trained model with test data ( the data the model hasn't seen before)
unlabeled_data = test_data.select('features')
unlabeled_data.show()
+--------------------+ | features| +--------------------+ |[0.01301,35.0,1.5...| |[0.01381,80.0,0.4...| |[0.01538,90.0,3.7...| |[0.01709,90.0,2.0...| |[0.01965,80.0,1.7...| |[0.02177,82.5,2.0...| |[0.02498,0.0,1.89...| |[0.02731,0.0,7.07...| |[0.02763,75.0,2.9...| |[0.02899,40.0,1.2...| |[0.03113,0.0,4.39...| |[0.03445,82.5,2.0...| |[0.0351,95.0,2.68...| |[0.03548,80.0,3.6...| |[0.03584,80.0,3.3...| |[0.03615,80.0,4.9...| |[0.03738,0.0,5.19...| |[0.04297,52.5,5.3...| |[0.0456,0.0,13.89...| |[0.04741,0.0,11.9...| +--------------------+ only showing top 20 rows
# run the prediction
predictions = lr_model.transform(unlabeled_data)
predictions.show()
+--------------------+------------------+ | features| prediction| +--------------------+------------------+ |[0.01301,35.0,1.5...| 29.87915208530114| |[0.01381,80.0,0.4...|39.907170632389985| |[0.01538,90.0,3.7...| 36.61629716012896| |[0.01709,90.0,2.0...| 24.32620664972785| |[0.01965,80.0,1.7...| 18.66086463746668| |[0.02177,82.5,2.0...| 35.93999051120604| |[0.02498,0.0,1.89...|22.394733583181768| |[0.02731,0.0,7.07...| 25.37127988226535| |[0.02763,75.0,2.9...|30.900094843969615| |[0.02899,40.0,1.2...| 21.09498774772718| |[0.03113,0.0,4.39...| 16.36803672451398| |[0.03445,82.5,2.0...|29.534888522994933| |[0.0351,95.0,2.68...| 40.83719548938337| |[0.03548,80.0,3.6...|20.936497465180047| |[0.03584,80.0,3.3...|30.271601936262662| |[0.03615,80.0,4.9...| 31.1572938422849| |[0.03738,0.0,5.19...| 21.24619962883419| |[0.04297,52.5,5.3...|25.852121642428017| |[0.0456,0.0,13.89...|26.752964004804735| |[0.04741,0.0,11.9...|22.984224152768945| +--------------------+------------------+ only showing top 20 rows
# end of 11.37
# next : 11.38