import findspark
findspark.init() # this allows you to import pyspark as a library
import pyspark
from pyspark.sql import SparkSession as SS # to get your spark session.
# Now instantiate your spark session using builder..
spark= SS.builder.master("local").appName("Predcting-Diabetes-Readmission").getOrCreate()
If the data is clean then we can directly import the data using SparkSession.read() - This would read in the data as a Data Frame. We can do many different things with the data frame but for some complex cleaning, working with RDDs is preferred.
In this dataset, there are missing values coded as "?" so I will first import the data as a DF but use its RDD form to replace "?" with Python None object and then create a Spark DataFrame from the resulting data so that the missing values are read in as missing.
rawData = spark.read\
.format("csv")\
.option("header", "true")\
.load("diabetic_data.csv")
rawData.take(1)
[Row(encounter_id='2278392', patient_nbr='8222157', race='Caucasian', gender='Female', age='[0-10)', weight='?', admission_type_id='6', discharge_disposition_id='25', admission_source_id='1', time_in_hospital='1', payer_code='?', medical_specialty='Pediatrics-Endocrinology', num_lab_procedures='41', num_procedures='0', num_medications='1', number_outpatient='0', number_emergency='0', number_inpatient='0', diag_1='250.83', diag_2='?', diag_3='?', number_diagnoses='1', max_glu_serum='None', A1Cresult='None', metformin='No', repaglinide='No', nateglinide='No', chlorpropamide='No', glimepiride='No', acetohexamide='No', glipizide='No', glyburide='No', tolbutamide='No', pioglitazone='No', rosiglitazone='No', acarbose='No', miglitol='No', troglitazone='No', tolazamide='No', examide='No', citoglipton='No', insulin='No', glyburide-metformin='No', glipizide-metformin='No', glimepiride-pioglitazone='No', metformin-rosiglitazone='No', metformin-pioglitazone='No', change='No', diabetesMed='No', readmitted='NO')]
We can see above that variable weight, payer_code etc.. having "?" values which are missing values
Converting DF to RDD, replacing "?" by Python None object using map and reconverting RDD to DF
Also, dropping encounter_id and patient_nbr which are keys and won't be needed for prediction
rawData = rawData.drop('encounter_id', 'patient_nbr')
from pyspark.sql.types import *
field_names = rawData.columns
fields = [StructField(field_name, StringType(), True) for field_name in field_names]
rddWithoutQues = rawData.rdd.map(lambda x: [None if string == "?" else string for string in x])
schema = StructType(fields)
diabetes = spark.createDataFrame(rddWithoutQues, schema)
diabetes.take(1)
[Row(race='Caucasian', gender='Female', age='[0-10)', weight=None, admission_type_id='6', discharge_disposition_id='25', admission_source_id='1', time_in_hospital='1', payer_code=None, medical_specialty='Pediatrics-Endocrinology', num_lab_procedures='41', num_procedures='0', num_medications='1', number_outpatient='0', number_emergency='0', number_inpatient='0', diag_1='250.83', diag_2=None, diag_3=None, number_diagnoses='1', max_glu_serum='None', A1Cresult='None', metformin='No', repaglinide='No', nateglinide='No', chlorpropamide='No', glimepiride='No', acetohexamide='No', glipizide='No', glyburide='No', tolbutamide='No', pioglitazone='No', rosiglitazone='No', acarbose='No', miglitol='No', troglitazone='No', tolazamide='No', examide='No', citoglipton='No', insulin='No', glyburide-metformin='No', glipizide-metformin='No', glimepiride-pioglitazone='No', metformin-rosiglitazone='No', metformin-pioglitazone='No', change='No', diabetesMed='No', readmitted='NO')]
Our response variable had categories ">30" "<30" and "NO". We want ">30" and "<30" to be combined into 1 category "Yes".
We do this by definin a User Defined Function and passing it into .withColumn(col_name, function(col_name)) method of a Spark Data Frame
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def modify_values(r):
if r == ">30" or r =="<30":
return "Yes"
else:
return "No"
ol_val = udf(modify_values, StringType())
diabetes = diabetes.withColumn("readmitted",ol_val(diabetes.readmitted))
diabetes.take(1)
[Row(race='Caucasian', gender='Female', age='[0-10)', weight=None, admission_type_id='6', discharge_disposition_id='25', admission_source_id='1', time_in_hospital='1', payer_code=None, medical_specialty='Pediatrics-Endocrinology', num_lab_procedures='41', num_procedures='0', num_medications='1', number_outpatient='0', number_emergency='0', number_inpatient='0', diag_1='250.83', diag_2=None, diag_3=None, number_diagnoses='1', max_glu_serum='None', A1Cresult='None', metformin='No', repaglinide='No', nateglinide='No', chlorpropamide='No', glimepiride='No', acetohexamide='No', glipizide='No', glyburide='No', tolbutamide='No', pioglitazone='No', rosiglitazone='No', acarbose='No', miglitol='No', troglitazone='No', tolazamide='No', examide='No', citoglipton='No', insulin='No', glyburide-metformin='No', glipizide-metformin='No', glimepiride-pioglitazone='No', metformin-rosiglitazone='No', metformin-pioglitazone='No', change='No', diabetesMed='No', readmitted='No')]
from pyspark.sql.functions import isnan, when, count, col
diabetes.select([count(when(col(c).isNull(),c)).alias(c) for c in diabetes.columns]).show()
+----+------+---+------+-----------------+------------------------+-------------------+----------------+----------+-----------------+------------------+--------------+---------------+-----------------+----------------+----------------+------+------+------+----------------+-------------+---------+---------+-----------+-----------+--------------+-----------+-------------+---------+---------+-----------+------------+-------------+--------+--------+------------+----------+-------+-----------+-------+-------------------+-------------------+------------------------+-----------------------+----------------------+------+-----------+----------+ |race|gender|age|weight|admission_type_id|discharge_disposition_id|admission_source_id|time_in_hospital|payer_code|medical_specialty|num_lab_procedures|num_procedures|num_medications|number_outpatient|number_emergency|number_inpatient|diag_1|diag_2|diag_3|number_diagnoses|max_glu_serum|A1Cresult|metformin|repaglinide|nateglinide|chlorpropamide|glimepiride|acetohexamide|glipizide|glyburide|tolbutamide|pioglitazone|rosiglitazone|acarbose|miglitol|troglitazone|tolazamide|examide|citoglipton|insulin|glyburide-metformin|glipizide-metformin|glimepiride-pioglitazone|metformin-rosiglitazone|metformin-pioglitazone|change|diabetesMed|readmitted| +----+------+---+------+-----------------+------------------------+-------------------+----------------+----------+-----------------+------------------+--------------+---------------+-----------------+----------------+----------------+------+------+------+----------------+-------------+---------+---------+-----------+-----------+--------------+-----------+-------------+---------+---------+-----------+------------+-------------+--------+--------+------------+----------+-------+-----------+-------+-------------------+-------------------+------------------------+-----------------------+----------------------+------+-----------+----------+ |2273| 0| 0| 98569| 0| 0| 0| 0| 40256| 49949| 0| 0| 0| 0| 0| 0| 21| 358| 1423| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| +----+------+---+------+-----------------+------------------------+-------------------+----------------+----------+-----------------+------------------+--------------+---------------+-----------------+----------------+----------------+------+------+------+----------------+-------------+---------+---------+-----------+-----------+--------------+-----------+-------------+---------+---------+-----------+------------+-------------+--------+--------+------------+----------+-------+-----------+-------+-------------------+-------------------+------------------------+-----------------------+----------------------+------+-----------+----------+
We can see columns 'weight', 'payer_code' and 'medical_speciality' have many missing values
Lets go ahead and delete these columns
diabetes = diabetes.drop('weight', 'payer_code', 'medical_specialty')
#diabetes.take(1)
Converting a few features which are numeric but have been parsed as Strings due to "?"
Dropping rows with NA values.
I did not find an efficient way to do this so I had to hardcode.
diabetes = diabetes.withColumn("diag_2", diabetes["diag_2"].cast(DoubleType()))
diabetes = diabetes.withColumn("diag_3", diabetes["diag_3"].cast(DoubleType()))
diabetes = diabetes.withColumn("diag_1", diabetes["diag_1"].cast(DoubleType()))
diabetes = diabetes.withColumn("time_in_hospital", diabetes["time_in_hospital"].cast(DoubleType()))
diabetes = diabetes.withColumn("num_lab_procedures", diabetes["num_lab_procedures"].cast(DoubleType()))
diabetes = diabetes.withColumn("num_medications", diabetes["num_medications"].cast(DoubleType()))
diabetes = diabetes.withColumn("number_emergency", diabetes["number_emergency"].cast(DoubleType()))
diabetes = diabetes.withColumn("number_inpatient", diabetes["number_inpatient"].cast(DoubleType()))
diabetes = diabetes.withColumn("number_diagnoses", diabetes["number_diagnoses"].cast(DoubleType()))
# Dropping the remaining few NA rows
diabetes = diabetes.dropna()
#diabetes.take(1)
I had to import a list of Redundant and unbalanced features from R
Deleting these features
# Redundant and unbalacned feature list imported from R
diabetes = diabetes.drop('examide', 'citoglipton', 'metformin-rosiglitazone', 'metformin-pioglitazone',
'glimepiride-pioglitazone', 'citoglipton, examide', 'acetohexamide',
'repaglinide', 'nateglinide', 'chlorpropamide', 'tolbutamide', 'acarbose', 'miglitol',
'troglitazone', 'tolazamide', 'glyburide-metformin', 'glipizide-metformin')
diabetes
DataFrame[race: string, gender: string, age: string, admission_type_id: string, discharge_disposition_id: string, admission_source_id: string, time_in_hospital: double, num_lab_procedures: double, num_procedures: string, num_medications: double, number_outpatient: string, number_emergency: double, number_inpatient: double, diag_1: double, diag_2: double, diag_3: double, number_diagnoses: double, max_glu_serum: string, A1Cresult: string, metformin: string, glimepiride: string, glipizide: string, glyburide: string, pioglitazone: string, rosiglitazone: string, insulin: string, change: string, diabetesMed: string, readmitted: string]
Spark.ML's current functionality deals only with numeric columns
We will use the StringIndexer class to index each categorical column
We will leave the already double columns as is
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
# Creating a list of STRING dtype columns
cols_to_index_1 = [x[0] if x[1] == "string" else None for x in diabetes.dtypes]
cols_to_index = [x for x in cols_to_index_1 if x != None]
# Creating the indexers for each column
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in cols_to_index]
indexers_fitted = [ind.fit(diabetes) for ind in indexers]
# Passing it on to the Pipeline function
# Pipeline transforms our DF using "stages"
pipeline = Pipeline(stages=indexers_fitted)
diabetes_indexed = pipeline.fit(diabetes).transform(diabetes)
diabetes_indexed
DataFrame[race: string, gender: string, age: string, admission_type_id: string, discharge_disposition_id: string, admission_source_id: string, time_in_hospital: double, num_lab_procedures: double, num_procedures: string, num_medications: double, number_outpatient: string, number_emergency: double, number_inpatient: double, diag_1: double, diag_2: double, diag_3: double, number_diagnoses: double, max_glu_serum: string, A1Cresult: string, metformin: string, glimepiride: string, glipizide: string, glyburide: string, pioglitazone: string, rosiglitazone: string, insulin: string, change: string, diabetesMed: string, readmitted: string, race_index: double, gender_index: double, age_index: double, admission_type_id_index: double, discharge_disposition_id_index: double, admission_source_id_index: double, num_procedures_index: double, number_outpatient_index: double, max_glu_serum_index: double, A1Cresult_index: double, metformin_index: double, glimepiride_index: double, glipizide_index: double, glyburide_index: double, pioglitazone_index: double, rosiglitazone_index: double, insulin_index: double, change_index: double, diabetesMed_index: double, readmitted_index: double]
Since our indexed columns have a suffix "index" now, we delete the original columns
Also, renaming "response_indexed" to "response". Response is "readmitted"
diabetes_indexed = diabetes_indexed.drop('race',
'gender',
'age',
'admission_type_id',
'discharge_disposition_id',
'admission_source_id',
'num_procedures',
'number_outpatient',
'max_glu_serum',
'A1Cresult',
'metformin',
'glimepiride',
'glipizide',
'glyburide',
'pioglitazone',
'rosiglitazone',
'insulin',
'change',
'diabetesMed',
'readmitted')
diabetes_indexed = diabetes_indexed.withColumnRenamed("readmitted_index", "readmitted")
Spark has several different modules for feature selection
I tried to use ChiSq feature selection and was successful
I decided to not include it in my analysis as it needs more work
#from pyspark.ml.linalg import Vectors
#from pyspark.ml.feature import ChiSqSelector
#def Vectorize(data):
# return data.rdd.map(lambda r: [Vectors.dense(r[0:19]), r[19]]).toDF(["features","readmitted"])
#diabetes_indexed_vectorized = Vectorize(diabetes_indexed)
#selector = ChiSqSelector(numTopFeatures=7, featuresCol="features",
# outputCol="selectedFeatures", labelCol="readmitted")
#diabetes_chsq = selector.fit(diabetes_indexed_vectorized).transform(diabetes_indexed_vectorized)
#diabetes_chsq = diabetes_chsq.drop('features')
cols_to_ohe = cols_to_index
cols_to_ohe.remove("readmitted")
cols_to_ohe_ = [col+"_index" for col in cols_to_ohe]
cols_to_ohe_
['race_index', 'gender_index', 'age_index', 'admission_type_id_index', 'discharge_disposition_id_index', 'admission_source_id_index', 'num_procedures_index', 'number_outpatient_index', 'max_glu_serum_index', 'A1Cresult_index', 'metformin_index', 'glimepiride_index', 'glipizide_index', 'glyburide_index', 'pioglitazone_index', 'rosiglitazone_index', 'insulin_index', 'change_index', 'diabetesMed_index']
OHE Code
Output of OHE is in the form of sparse vectors. One each for one feature
output_ohe_cols = [x+"_vector" for x in cols_to_ohe_]
from pyspark.ml.feature import OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(inputCols=cols_to_ohe_,
outputCols=output_ohe_cols)
model_ohe = encoder.fit(diabetes_indexed)
diabetes_ohe = model_ohe.transform(diabetes_indexed)
diabetes_ohe.take(1)
[Row(time_in_hospital=3.0, num_lab_procedures=59.0, num_medications=18.0, number_emergency=0.0, number_inpatient=0.0, diag_1=276.0, diag_2=250.01, diag_3=255.0, number_diagnoses=9.0, race_index=0.0, gender_index=0.0, age_index=8.0, admission_type_id_index=0.0, discharge_disposition_id_index=0.0, admission_source_id_index=0.0, num_procedures_index=0.0, number_outpatient_index=0.0, max_glu_serum_index=0.0, A1Cresult_index=0.0, metformin_index=0.0, glimepiride_index=0.0, glipizide_index=0.0, glyburide_index=0.0, pioglitazone_index=0.0, rosiglitazone_index=0.0, insulin_index=3.0, change_index=1.0, diabetesMed_index=0.0, readmitted=1.0, metformin_index_vector=SparseVector(3, {0: 1.0}), admission_source_id_index_vector=SparseVector(15, {0: 1.0}), rosiglitazone_index_vector=SparseVector(3, {0: 1.0}), glimepiride_index_vector=SparseVector(3, {0: 1.0}), discharge_disposition_id_index_vector=SparseVector(25, {0: 1.0}), glipizide_index_vector=SparseVector(3, {0: 1.0}), max_glu_serum_index_vector=SparseVector(3, {0: 1.0}), gender_index_vector=SparseVector(2, {0: 1.0}), number_outpatient_index_vector=SparseVector(38, {0: 1.0}), race_index_vector=SparseVector(4, {0: 1.0}), diabetesMed_index_vector=SparseVector(1, {0: 1.0}), admission_type_id_index_vector=SparseVector(7, {0: 1.0}), A1Cresult_index_vector=SparseVector(3, {0: 1.0}), change_index_vector=SparseVector(1, {}), glyburide_index_vector=SparseVector(3, {0: 1.0}), age_index_vector=SparseVector(9, {8: 1.0}), insulin_index_vector=SparseVector(3, {}), pioglitazone_index_vector=SparseVector(3, {0: 1.0}), num_procedures_index_vector=SparseVector(6, {0: 1.0}))]
diabetes_ohe = diabetes_ohe.drop('race_index',
'gender_index',
'age_index',
'admission_type_id_index',
'discharge_disposition_id_index',
'admission_source_id_index',
'num_procedures_index',
'number_outpatient_index',
'max_glu_serum_index',
'A1Cresult_index',
'metformin_index',
'glimepiride_index',
'glipizide_index',
'glyburide_index',
'pioglitazone_index',
'rosiglitazone_index',
'insulin_index',
'change_index',
'diabetesMed_index')
diabetes_ohe
DataFrame[time_in_hospital: double, num_lab_procedures: double, num_medications: double, number_emergency: double, number_inpatient: double, diag_1: double, diag_2: double, diag_3: double, number_diagnoses: double, readmitted: double, metformin_index_vector: vector, admission_source_id_index_vector: vector, rosiglitazone_index_vector: vector, glimepiride_index_vector: vector, discharge_disposition_id_index_vector: vector, glipizide_index_vector: vector, max_glu_serum_index_vector: vector, gender_index_vector: vector, number_outpatient_index_vector: vector, race_index_vector: vector, diabetesMed_index_vector: vector, admission_type_id_index_vector: vector, A1Cresult_index_vector: vector, change_index_vector: vector, glyburide_index_vector: vector, age_index_vector: vector, insulin_index_vector: vector, pioglitazone_index_vector: vector, num_procedures_index_vector: vector]
diabetes_ohe.show(1)
+----------------+------------------+---------------+----------------+----------------+------+------+------+----------------+----------+----------------------+--------------------------------+--------------------------+------------------------+-------------------------------------+----------------------+--------------------------+-------------------+------------------------------+-----------------+------------------------+------------------------------+----------------------+-------------------+----------------------+----------------+--------------------+-------------------------+---------------------------+ |time_in_hospital|num_lab_procedures|num_medications|number_emergency|number_inpatient|diag_1|diag_2|diag_3|number_diagnoses|readmitted|metformin_index_vector|admission_source_id_index_vector|rosiglitazone_index_vector|glimepiride_index_vector|discharge_disposition_id_index_vector|glipizide_index_vector|max_glu_serum_index_vector|gender_index_vector|number_outpatient_index_vector|race_index_vector|diabetesMed_index_vector|admission_type_id_index_vector|A1Cresult_index_vector|change_index_vector|glyburide_index_vector|age_index_vector|insulin_index_vector|pioglitazone_index_vector|num_procedures_index_vector| +----------------+------------------+---------------+----------------+----------------+------+------+------+----------------+----------+----------------------+--------------------------------+--------------------------+------------------------+-------------------------------------+----------------------+--------------------------+-------------------+------------------------------+-----------------+------------------------+------------------------------+----------------------+-------------------+----------------------+----------------+--------------------+-------------------------+---------------------------+ | 3.0| 59.0| 18.0| 0.0| 0.0| 276.0|250.01| 255.0| 9.0| 1.0| (3,[0],[1.0])| (15,[0],[1.0])| (3,[0],[1.0])| (3,[0],[1.0])| (25,[0],[1.0])| (3,[0],[1.0])| (3,[0],[1.0])| (2,[0],[1.0])| (38,[0],[1.0])| (4,[0],[1.0])| (1,[0],[1.0])| (7,[0],[1.0])| (3,[0],[1.0])| (1,[],[])| (3,[0],[1.0])| (9,[8],[1.0])| (3,[],[])| (3,[0],[1.0])| (6,[0],[1.0])| +----------------+------------------+---------------+----------------+----------------+------+------+------+----------------+----------+----------------------+--------------------------------+--------------------------+------------------------+-------------------------------------+----------------------+--------------------------+-------------------+------------------------------+-----------------+------------------------+------------------------------+----------------------+-------------------+----------------------+----------------+--------------------+-------------------------+---------------------------+ only showing top 1 row
Its a bit difficult to see but each column has either a double value or a sparkse vector value.
We need to assemble these features together in a way which Spark.ml algorithms require the features to be.
We will use VectorAssembler to do that
# The following code will take in all of these columns and convert it to 1 column named "features" which will store data of
# ALL features for 1 record (row)
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=['time_in_hospital',
'num_lab_procedures',
'num_medications',
'number_emergency',
'number_inpatient',
'diag_1',
'diag_2',
'diag_3',
'number_diagnoses',
'metformin_index_vector',
'admission_source_id_index_vector',
'rosiglitazone_index_vector',
'glimepiride_index_vector',
'discharge_disposition_id_index_vector',
'glipizide_index_vector',
'max_glu_serum_index_vector',
'gender_index_vector',
'number_outpatient_index_vector',
'race_index_vector',
'diabetesMed_index_vector',
'admission_type_id_index_vector',
'A1Cresult_index_vector',
'change_index_vector',
'glyburide_index_vector',
'age_index_vector',
'insulin_index_vector',
'pioglitazone_index_vector',
'num_procedures_index_vector'],
outputCol="features")
output = assembler.transform(diabetes_ohe)
output = output.drop('time_in_hospital',
'num_lab_procedures',
'num_medications',
'number_emergency',
'number_inpatient',
'diag_1',
'diag_2',
'diag_3',
'number_diagnoses',
'metformin_index_vector',
'admission_source_id_index_vector',
'rosiglitazone_index_vector',
'glimepiride_index_vector',
'discharge_disposition_id_index_vector',
'glipizide_index_vector',
'max_glu_serum_index_vector',
'gender_index_vector',
'number_outpatient_index_vector',
'race_index_vector',
'diabetesMed_index_vector',
'admission_type_id_index_vector',
'A1Cresult_index_vector',
'change_index_vector',
'glyburide_index_vector',
'age_index_vector',
'insulin_index_vector',
'pioglitazone_index_vector',
'num_procedures_index_vector')
output.show(5)
+----------+--------------------+ |readmitted| features| +----------+--------------------+ | 1.0|(144,[0,1,2,5,6,7...| | 0.0|(144,[0,1,2,5,6,7...| | 0.0|(144,[0,1,2,5,6,7...| | 1.0|(144,[0,1,2,5,6,7...| | 1.0|(144,[0,1,2,5,6,7...| +----------+--------------------+ only showing top 5 rows
Training RF with a training, testing split of 80-20%
Testing the algorithm on test set and print the Accuracy
Accuracy = 63.42%
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
training, testing = output.randomSplit([0.8,0.2])
rf = RandomForestClassifier(numTrees=100, maxDepth=6, labelCol="readmitted", seed=42,
featureSubsetStrategy='onethird')
model = rf.fit(training)
predictions = model.transform(testing)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="readmitted", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)
Accuracy = 0.634299
Training Logistic regression with Elastic Net Regularization
Accuracy = 53.9%
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0.8, labelCol="readmitted",
featuresCol="features")
lrModel = lr.fit(training)
preds_lr = lrModel.transform(testing)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="readmitted", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(preds_lr)
print("Accuracy = %g" % accuracy)
Accuracy = 0.539396