from pyspark.sql import SparkSession
import sys
sys.path.append('..')
from utils.pysparkutils import *
spark = SparkSession.builder.appName('income').getOrCreate()
from pyspark.sql.types import *
# schema = StructType([
# StructField("age", IntegerType(), True),
# StructField("workclass", StringType(), True),
# StructField("fnlwgt", FloatType(), True),
# StructField("education", StringType(), True),
# StructField("education-num", FloatType(), True),
# StructField("marital-status", StringType(), True),
# StructField("occupation", StringType(), True),
# StructField("relationship", StringType(), True),
# StructField("race", StringType(), True),
# StructField("sex", StringType(), True),
# StructField("capital-gain", FloatType(), True),
# StructField("capital-loss", FloatType(), True),
# StructField("hours-per-week", FloatType(), True),
# StructField("native-country", StringType(), True),
# StructField("class", StringType(), True)]
# )
# train = spark.read.csv('./adult.data.txt', schema=schema, inferSchema='true')
headers = ["age", "workclass", "fnlwgt", "education", "education-num",
"marital-status", "occupation", "relationship", "race", "sex",
"capital-gain", "capital-loss", "hours-per-week", "native-country",
"class"]
train = spark.read.csv('./adult.data.txt',
inferSchema='true',
ignoreLeadingWhiteSpace='true',
ignoreTrailingWhiteSpace='true').toDF(*headers)
train.printSchema()
root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education-num: integer (nullable = true) |-- marital-status: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital-gain: integer (nullable = true) |-- capital-loss: integer (nullable = true) |-- hours-per-week: integer (nullable = true) |-- native-country: string (nullable = true) |-- class: string (nullable = true)
from pyspark.sql.functions import udf, monotonically_increasing_id
train = train.withColumn('id', monotonically_increasing_id())
labelCol = 'class'
train.count()
32561
train.groupby(labelCol).count().toPandas()
class | count | |
---|---|---|
0 | <=50K | 24720 |
1 | >50K | 7841 |
We can see there is a class imbalance problem in our training set.
findMissingValuesCols(train)
[]
There is no missing values in our training data.
train.select('age').distinct().show()
+---+ |age| +---+ | 31| | 85| | 65| | 53| | 78| | 34| | 81| | 28| | 76| | 27| | 26| | 44| | 22| | 47| | 52| | 86| | 40| | 20| | 57| | 54| +---+ only showing top 20 rows
percentageCol = 'percentage'
df = crosstabPercentage(train, 'race', labelCol)
df = df.withColumn(percentageCol, format_number(df[percentageCol], 2))
df.toPandas()
race | class | count | percentage | |
---|---|---|---|---|
0 | Amer-Indian-Eskimo | <=50K | 275 | 88.42 |
1 | Amer-Indian-Eskimo | >50K | 36 | 11.58 |
2 | Asian-Pac-Islander | <=50K | 763 | 73.44 |
3 | Asian-Pac-Islander | >50K | 276 | 26.56 |
4 | Black | <=50K | 2737 | 87.61 |
5 | Black | >50K | 387 | 12.39 |
6 | Other | <=50K | 246 | 90.77 |
7 | Other | >50K | 25 | 9.23 |
8 | White | <=50K | 20699 | 74.41 |
9 | White | >50K | 7117 | 25.59 |
df = crosstabPercentage(train, 'age', labelCol)
df = df.orderBy('age').withColumn('percentage',
format_number(df['percentage'], 2))
df.toPandas()
age | class | count | percentage | |
---|---|---|---|---|
0 | 17 | <=50K | 395 | 100.00 |
1 | 18 | <=50K | 550 | 100.00 |
2 | 19 | <=50K | 710 | 99.72 |
3 | 19 | >50K | 2 | 0.28 |
4 | 20 | <=50K | 753 | 100.00 |
5 | 21 | <=50K | 717 | 99.58 |
6 | 21 | >50K | 3 | 0.42 |
7 | 22 | <=50K | 752 | 98.30 |
8 | 22 | >50K | 13 | 1.70 |
9 | 23 | <=50K | 865 | 98.63 |
10 | 23 | >50K | 12 | 1.37 |
11 | 24 | <=50K | 767 | 96.12 |
12 | 24 | >50K | 31 | 3.88 |
13 | 25 | <=50K | 788 | 93.70 |
14 | 25 | >50K | 53 | 6.30 |
15 | 26 | <=50K | 722 | 91.97 |
16 | 26 | >50K | 63 | 8.03 |
17 | 27 | <=50K | 754 | 90.30 |
18 | 27 | >50K | 81 | 9.70 |
19 | 28 | <=50K | 748 | 86.27 |
20 | 28 | >50K | 119 | 13.73 |
21 | 29 | <=50K | 679 | 83.52 |
22 | 29 | >50K | 134 | 16.48 |
23 | 30 | <=50K | 690 | 80.14 |
24 | 30 | >50K | 171 | 19.86 |
25 | 31 | <=50K | 705 | 79.39 |
26 | 31 | >50K | 183 | 20.61 |
27 | 32 | <=50K | 639 | 77.17 |
28 | 32 | >50K | 189 | 22.83 |
29 | 33 | <=50K | 684 | 78.17 |
... | ... | ... | ... | ... |
108 | 72 | >50K | 9 | 13.43 |
109 | 73 | <=50K | 54 | 84.38 |
110 | 73 | >50K | 10 | 15.62 |
111 | 74 | <=50K | 39 | 76.47 |
112 | 74 | >50K | 12 | 23.53 |
113 | 75 | <=50K | 38 | 84.44 |
114 | 75 | >50K | 7 | 15.56 |
115 | 76 | <=50K | 41 | 89.13 |
116 | 76 | >50K | 5 | 10.87 |
117 | 77 | <=50K | 24 | 82.76 |
118 | 77 | >50K | 5 | 17.24 |
119 | 78 | <=50K | 18 | 78.26 |
120 | 78 | >50K | 5 | 21.74 |
121 | 79 | <=50K | 13 | 59.09 |
122 | 79 | >50K | 9 | 40.91 |
123 | 80 | <=50K | 20 | 90.91 |
124 | 80 | >50K | 2 | 9.09 |
125 | 81 | <=50K | 17 | 85.00 |
126 | 81 | >50K | 3 | 15.00 |
127 | 82 | <=50K | 12 | 100.00 |
128 | 83 | <=50K | 4 | 66.67 |
129 | 83 | >50K | 2 | 33.33 |
130 | 84 | <=50K | 9 | 90.00 |
131 | 84 | >50K | 1 | 10.00 |
132 | 85 | <=50K | 3 | 100.00 |
133 | 86 | <=50K | 1 | 100.00 |
134 | 87 | <=50K | 1 | 100.00 |
135 | 88 | <=50K | 3 | 100.00 |
136 | 90 | <=50K | 35 | 81.40 |
137 | 90 | >50K | 8 | 18.60 |
138 rows × 4 columns
df = crosstabPercentage(train, 'sex', labelCol)
df = df.withColumn('percentage', format_number(df['percentage'], 2))
df.toPandas()
sex | class | count | percentage | |
---|---|---|---|---|
0 | Female | <=50K | 9592 | 89.05 |
1 | Female | >50K | 1179 | 10.95 |
2 | Male | <=50K | 15128 | 69.43 |
3 | Male | >50K | 6662 | 30.57 |
crosstabPercentage
is a simple way to explore the relation between a particular categorical feature and the label. For instance, the above shows the usefulness of sex
feature in predicting the salary. It is obvious that more men earn >50K salary than women. So if a person is male, then he is more likely to earn >50K salary.
df = crosstabPercentage(train, 'education', labelCol)
df = df.withColumn('percentage', format_number(df['percentage'], 2))
df.toPandas()
education | class | count | percentage | |
---|---|---|---|---|
0 | 10th | <=50K | 871 | 93.35 |
1 | 10th | >50K | 62 | 6.65 |
2 | 11th | <=50K | 1115 | 94.89 |
3 | 11th | >50K | 60 | 5.11 |
4 | 12th | <=50K | 400 | 92.38 |
5 | 12th | >50K | 33 | 7.62 |
6 | 1st-4th | <=50K | 162 | 96.43 |
7 | 1st-4th | >50K | 6 | 3.57 |
8 | 5th-6th | <=50K | 317 | 95.20 |
9 | 5th-6th | >50K | 16 | 4.80 |
10 | 7th-8th | <=50K | 606 | 93.81 |
11 | 7th-8th | >50K | 40 | 6.19 |
12 | 9th | <=50K | 487 | 94.75 |
13 | 9th | >50K | 27 | 5.25 |
14 | Assoc-acdm | <=50K | 802 | 75.16 |
15 | Assoc-acdm | >50K | 265 | 24.84 |
16 | Assoc-voc | <=50K | 1021 | 73.88 |
17 | Assoc-voc | >50K | 361 | 26.12 |
18 | Bachelors | <=50K | 3134 | 58.52 |
19 | Bachelors | >50K | 2221 | 41.48 |
20 | Doctorate | <=50K | 107 | 25.91 |
21 | Doctorate | >50K | 306 | 74.09 |
22 | HS-grad | <=50K | 8826 | 84.05 |
23 | HS-grad | >50K | 1675 | 15.95 |
24 | Masters | <=50K | 764 | 44.34 |
25 | Masters | >50K | 959 | 55.66 |
26 | Preschool | <=50K | 51 | 100.00 |
27 | Prof-school | <=50K | 153 | 26.56 |
28 | Prof-school | >50K | 423 | 73.44 |
29 | Some-college | <=50K | 5904 | 80.98 |
30 | Some-college | >50K | 1387 | 19.02 |
%%script false
educationNumClass = crosstabPercentage(train, 'education-num', labelCol)
educationNumClass = educationNumClass.withColumn('percentage',
format_number(educationNumClass['percentage-'], 2))
educationNumClass = educationNumClass.withColumn('education-numClassF', educationNumClass['education-numClass'].cast(DoubleType()))\
.orderBy('education-numClassF').drop('education-numClass')
cols = educationNumClass.columns
cols.remove('education-numClassF')
cols.insert(0, 'education-numClassF')
educationNumClass = educationNumClass.select(cols)
educationNumClass.show()
We can see above that this is a sparse matrix, it's hard to find the non-zero values. So we will only focus on non-zero values to find out whether there is any relationship between these features and one of them is redundant.
%%script false
from pyspark.sql.functions import coalesce, lit, when
iterator = df.toLocalIterator()
d = {}
for row in iterator:
rowDict = row.asDict()
educationNum = rowDict['education-num_education']
for k, v in rowDict.items():
if k != 'education-num_education' and v != 0:
d[educationNum+'_'+k] = v
import json
s = json.dumps(d, indent=4)
print(s)
We can see it's obvious that these features are redundant. Only one of them should suffice for our classification task.
Let's try more rigorous chi square test instead of something hand-wavy.
First we will define an utility method that'll index the catgorical string columns, encodes them into one-hot-encoded vectors, and finally assemble all the feature vectos into once vector for later downstream analysis.
df = crosstabPercentage(train, 'workclass', labelCol)
df = df.withColumn('percentage', format_number(df['percentage'], 2))
df.toPandas()
workclass | class | count | percentage | |
---|---|---|---|---|
0 | ? | <=50K | 1645 | 89.60 |
1 | ? | >50K | 191 | 10.40 |
2 | Federal-gov | <=50K | 589 | 61.35 |
3 | Federal-gov | >50K | 371 | 38.65 |
4 | Local-gov | <=50K | 1476 | 70.52 |
5 | Local-gov | >50K | 617 | 29.48 |
6 | Never-worked | <=50K | 7 | 100.00 |
7 | Private | <=50K | 17733 | 78.13 |
8 | Private | >50K | 4963 | 21.87 |
9 | Self-emp-inc | <=50K | 494 | 44.27 |
10 | Self-emp-inc | >50K | 622 | 55.73 |
11 | Self-emp-not-inc | <=50K | 1817 | 71.51 |
12 | Self-emp-not-inc | >50K | 724 | 28.49 |
13 | State-gov | <=50K | 945 | 72.80 |
14 | State-gov | >50K | 353 | 27.20 |
15 | Without-pay | <=50K | 14 | 100.00 |
df = crosstabPercentage(train, 'hours-per-week', labelCol)
df = df.withColumn('percentage', format_number(df['percentage'], 2))
df.toPandas()
hours-per-week | class | count | percentage | |
---|---|---|---|---|
0 | 1 | <=50K | 18 | 90.00 |
1 | 1 | >50K | 2 | 10.00 |
2 | 2 | <=50K | 24 | 75.00 |
3 | 2 | >50K | 8 | 25.00 |
4 | 3 | <=50K | 38 | 97.44 |
5 | 3 | >50K | 1 | 2.56 |
6 | 4 | <=50K | 51 | 94.44 |
7 | 4 | >50K | 3 | 5.56 |
8 | 5 | <=50K | 53 | 88.33 |
9 | 5 | >50K | 7 | 11.67 |
10 | 6 | <=50K | 56 | 87.50 |
11 | 6 | >50K | 8 | 12.50 |
12 | 7 | <=50K | 22 | 84.62 |
13 | 7 | >50K | 4 | 15.38 |
14 | 8 | <=50K | 134 | 92.41 |
15 | 8 | >50K | 11 | 7.59 |
16 | 9 | <=50K | 17 | 94.44 |
17 | 9 | >50K | 1 | 5.56 |
18 | 10 | <=50K | 258 | 92.81 |
19 | 10 | >50K | 20 | 7.19 |
20 | 11 | <=50K | 11 | 100.00 |
21 | 12 | <=50K | 161 | 93.06 |
22 | 12 | >50K | 12 | 6.94 |
23 | 13 | <=50K | 21 | 91.30 |
24 | 13 | >50K | 2 | 8.70 |
25 | 14 | <=50K | 32 | 94.12 |
26 | 14 | >50K | 2 | 5.88 |
27 | 15 | <=50K | 389 | 96.29 |
28 | 15 | >50K | 15 | 3.71 |
29 | 16 | <=50K | 192 | 93.66 |
... | ... | ... | ... | ... |
143 | 78 | <=50K | 6 | 75.00 |
144 | 78 | >50K | 2 | 25.00 |
145 | 80 | <=50K | 76 | 57.14 |
146 | 80 | >50K | 57 | 42.86 |
147 | 81 | <=50K | 3 | 100.00 |
148 | 82 | <=50K | 1 | 100.00 |
149 | 84 | <=50K | 28 | 62.22 |
150 | 84 | >50K | 17 | 37.78 |
151 | 85 | <=50K | 9 | 69.23 |
152 | 85 | >50K | 4 | 30.77 |
153 | 86 | <=50K | 2 | 100.00 |
154 | 87 | <=50K | 1 | 100.00 |
155 | 88 | <=50K | 2 | 100.00 |
156 | 89 | <=50K | 1 | 50.00 |
157 | 89 | >50K | 1 | 50.00 |
158 | 90 | <=50K | 19 | 65.52 |
159 | 90 | >50K | 10 | 34.48 |
160 | 91 | <=50K | 3 | 100.00 |
161 | 92 | <=50K | 1 | 100.00 |
162 | 94 | <=50K | 1 | 100.00 |
163 | 95 | <=50K | 1 | 50.00 |
164 | 95 | >50K | 1 | 50.00 |
165 | 96 | <=50K | 4 | 80.00 |
166 | 96 | >50K | 1 | 20.00 |
167 | 97 | <=50K | 1 | 50.00 |
168 | 97 | >50K | 1 | 50.00 |
169 | 98 | <=50K | 8 | 72.73 |
170 | 98 | >50K | 3 | 27.27 |
171 | 99 | <=50K | 60 | 70.59 |
172 | 99 | >50K | 25 | 29.41 |
173 rows × 4 columns
We will use Attribute-Value Frequency (AVF) outlier detection in categorical features. The beauty of this algorithm is that it's very simple, highly parallelizable, and fit well with distributed programming paradigm. attributeValueFrequency
function is implemented in pysparkutils.py
file in utils
directory.
import seaborn as sns
avfScoreCol = 'avfScore'
categoricalCols = ['workclass', 'education', 'marital-status',
'occupation', 'relationship', 'race', 'sex',
'native-country']
avfScore = attributeValueFrequency(train, categoricalCols)
pdf = avfScore.groupby(avfScoreCol).count().toPandas()
pdf
avfScore | count | |
---|---|---|
0 | 64873 | 1 |
1 | 84761 | 1 |
2 | 85760 | 1 |
3 | 103252 | 1 |
4 | 52051 | 1 |
5 | 49136 | 1 |
6 | 91948 | 1 |
7 | 92741 | 2 |
8 | 99489 | 1 |
9 | 89041 | 1 |
10 | 95526 | 1 |
11 | 78598 | 1 |
12 | 84745 | 1 |
13 | 76448 | 1 |
14 | 80545 | 1 |
15 | 97699 | 1 |
16 | 86445 | 1 |
17 | 95149 | 3 |
18 | 101519 | 5 |
19 | 99688 | 4 |
20 | 96113 | 1 |
21 | 86132 | 1 |
22 | 74783 | 1 |
23 | 88291 | 2 |
24 | 75232 | 1 |
25 | 81293 | 5 |
26 | 66091 | 1 |
27 | 73190 | 1 |
28 | 107959 | 1 |
29 | 77034 | 1 |
... | ... | ... |
8159 | 121083 | 1 |
8160 | 90753 | 1 |
8161 | 83486 | 1 |
8162 | 88325 | 3 |
8163 | 96995 | 1 |
8164 | 100990 | 4 |
8165 | 90597 | 1 |
8166 | 100460 | 1 |
8167 | 92453 | 14 |
8168 | 95690 | 4 |
8169 | 104486 | 2 |
8170 | 119995 | 13 |
8171 | 131889 | 9 |
8172 | 110660 | 1 |
8173 | 102790 | 1 |
8174 | 122394 | 2 |
8175 | 124053 | 76 |
8176 | 134386 | 45 |
8177 | 113783 | 3 |
8178 | 94066 | 1 |
8179 | 73337 | 1 |
8180 | 91838 | 2 |
8181 | 90865 | 1 |
8182 | 100947 | 3 |
8183 | 71321 | 1 |
8184 | 110738 | 4 |
8185 | 99204 | 1 |
8186 | 38322 | 1 |
8187 | 89328 | 1 |
8188 | 65479 | 1 |
8189 rows × 2 columns
# sns.barplot(x="avfScore", y="count", data=pdf)
# pdf['count'].hist(by=pdf['avfScore'])
# pdf.plot(x='avfScore', y='count')
# pdf.plot(x='avfScore', y='count', kind='bar')
# Ideally, we want to use Spark for aggregation, and just plot the data by converting to
# Pandas. Unfortuantely I couldn't figure out a way yet, visualization is not my strongest skill.
# This approach is NOT recommended for large datasets which are residing over multiple machines
# Since this will bring whole dataframe to the driver node and the driver node might run out of
# memory.
avfScore.select(avfScoreCol).toPandas().hist(avfScoreCol)
array([[<matplotlib.axes._subplots.AxesSubplot object at 0x7ff049874978>]], dtype=object)
In AVF, the lower the score of a datapoint, the more likely that datapoint is an outlier. We can safely remove the rows whose score is below 70000.
train = avfScore.filter(col(avfScoreCol) > 70000).drop(avfScoreCol)
train.printSchema()
root |-- age: integer (nullable = true) |-- workclass: string (nullable = true) |-- fnlwgt: integer (nullable = true) |-- education: string (nullable = true) |-- education-num: integer (nullable = true) |-- marital-status: string (nullable = true) |-- occupation: string (nullable = true) |-- relationship: string (nullable = true) |-- race: string (nullable = true) |-- sex: string (nullable = true) |-- capital-gain: integer (nullable = true) |-- capital-loss: integer (nullable = true) |-- hours-per-week: integer (nullable = true) |-- native-country: string (nullable = true) |-- class: string (nullable = true) |-- id: long (nullable = false)
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.stat import ChiSquareTest
from pyspark.ml import Pipeline
indexed = train.select('education-num', 'education')
indexer = StringIndexer(inputCol='education', outputCol='educationIndexed')
indexed = indexer.fit(indexed).transform(indexed)
ohe = OneHotEncoderEstimator(inputCols=['education-num',], outputCols=['education-numOHE',])
indexed = ohe.fit(indexed).transform(indexed)
# The null hypothesis is that the occurrence of the outcomes is statistically independent.
# In general, small p-values (1% to 5%) would cause you to reject the null hypothesis.
# This very large p-value (92.65%) means that the null hypothesis should not be rejected.
testResult = ChiSquareTest.test(indexed, 'education-numOHE', 'educationIndexed')
r = testResult.head()
print("pValues: " + str(r.pValues))
pValues: [1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]
We can accept the hypothesis that features are dependent. We will drop the 'education' feature since the info. is covered
train = train.drop('education')
from pyspark.ml.clustering import KMeans
_, _, indexedDf = autoIndexer(train, labelCol)
kmeans = KMeans(k=2, featuresCol='assembled')
model = kmeans.fit(indexedDf)
indexedDf = model.transform(indexedDf)
model.summary
<pyspark.ml.clustering.KMeansSummary at 0x7ff049711390>
import numpy as np
from sklearn.metrics import adjusted_mutual_info_score
indexer = StringIndexer(inputCol=labelCol, outputCol=labelCol+'Indexed')
indexedDf = indexer.fit(indexedDf).transform(indexedDf)
classIndexed = [row[0] for row in indexedDf.select('classIndexed').collect()]
prediction = [row[0] for row in indexedDf.select('prediction').collect()]
adjusted_mutual_info_score(classIndexed, prediction)
0.15349812641524918
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
stringTypes = [dtype[0] for dtype in train.dtypes if dtype[1] == 'string']
indexedTypes = [stringType+'Indexed' for stringType in stringTypes]
indexers = [StringIndexer(inputCol=stringType, outputCol=stringType+'Indexed', handleInvalid='skip') \
for stringType in stringTypes]
from pyspark.ml.feature import OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.classification import GBTClassifier
oheTypes = [indexedType+'OneHotEncoded' for indexedType in indexedTypes]
ohe = OneHotEncoderEstimator(inputCols=indexedTypes, outputCols=oheTypes)
# Fix columns
oheTypes.remove('classIndexedOneHotEncoded')
cols = train.columns[:]
for oheType in oheTypes:
cols.append(oheType)
for stringType in stringTypes:
cols.remove(stringType)
cols.remove('id')
assembler = VectorAssembler(inputCols=cols, outputCol='assembled')
classifier = GBTClassifier(featuresCol='assembled', labelCol='classIndexed')
pipeline = Pipeline(stages=[*indexers, ohe, assembler, classifier])
model = pipeline.fit(train)
train = model.transform(train)
train
DataFrame[age: int, workclass: string, fnlwgt: int, education-num: int, marital-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: string, class: string, id: bigint, workclassIndexed: double, marital-statusIndexed: double, occupationIndexed: double, relationshipIndexed: double, raceIndexed: double, sexIndexed: double, native-countryIndexed: double, classIndexed: double, workclassIndexedOneHotEncoded: vector, raceIndexedOneHotEncoded: vector, occupationIndexedOneHotEncoded: vector, relationshipIndexedOneHotEncoded: vector, native-countryIndexedOneHotEncoded: vector, marital-statusIndexedOneHotEncoded: vector, sexIndexedOneHotEncoded: vector, classIndexedOneHotEncoded: vector, assembled: vector, rawPrediction: vector, probability: vector, prediction: double]
Since we have class imbalance problem, that's why we will use area under ROC curve as metric.
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='classIndexed', metricName='areaUnderROC')
metric = evaluator.evaluate(train)
metric
0.9179985970287455
headers = ["age", "workclass", "fnlwgt", "education", "education-num",
"marital-status", "occupation", "relationship", "race", "sex",
"capital-gain", "capital-loss", "hours-per-week", "native-country",
"class"]
test = spark.read.csv('./adult.test.txt',
inferSchema='true',
ignoreLeadingWhiteSpace='true',
ignoreTrailingWhiteSpace='true').toDF(*headers)
test.select('class').limit(10).toPandas()
class | |
---|---|
0 | <=50K. |
1 | <=50K. |
2 | >50K. |
3 | >50K. |
4 | <=50K. |
5 | <=50K. |
6 | <=50K. |
7 | >50K. |
8 | <=50K. |
9 | <=50K. |
We can see the class labels in the test dataset are different than in train - '>50K' and '>50K.'. So we have to remove the extrac dot from the class label, before evaluating.
from pyspark.sql.types import StringType
stripDot = udf(lambda s: s[:-1], StringType())
test = test.withColumn('classTrailed', stripDot(test['class'])).drop('class').withColumnRenamed('classTrailed', 'class')
test.select('class').limit(10).toPandas()
class | |
---|---|
0 | <=50K |
1 | <=50K |
2 | >50K |
3 | >50K |
4 | <=50K |
5 | <=50K |
6 | <=50K |
7 | >50K |
8 | <=50K |
9 | <=50K |
test = model.transform(test)
metric = evaluator.evaluate(test)
metric
0.9096987015789428