In this project, I have implemented credit card fraud detection model using Spark and LightGBMClassifier in Databricks runtime environment using dataset provided by Machine Learning Group at Université libre de Bruxelles (ULB). The dataset with 300,000 rows consisting 31 variables related to European Credit Card holder's transactions out of which 28 are numeric variables derived by performing Principal Component Analysis on some unrevealed original parameters. The remaining three variables are Amount of transaction, time of transaction in seconds relative to first tranaction and Class of transaction indicating whether its genuine or fradulent.
import pandas as pd
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer
from pyspark.ml.classification import LogisticRegression, GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from mmlspark import LightGBMClassifier
# File location and type
file_location = "/FileStore/tables/creditcard.csv"
file_type = "csv"
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.load(file_location)
pd.DataFrame(df.take(5), columns=df.columns).transpose()
0 | 1 | 2 | 3 | 4 | |
---|---|---|---|---|---|
Time | 0 | 0 | 1 | 1 | 2 |
V1 | -1.35981 | 1.19186 | -1.35835 | -0.966272 | -1.15823 |
V2 | -0.0727812 | 0.266151 | -1.34016 | -0.185226 | 0.877737 |
V3 | 2.53635 | 0.16648 | 1.77321 | 1.79299 | 1.54872 |
V4 | 1.37816 | 0.448154 | 0.37978 | -0.863291 | 0.403034 |
V5 | -0.338321 | 0.0600176 | -0.503198 | -0.0103089 | -0.407193 |
V6 | 0.462388 | -0.0823608 | 1.8005 | 1.2472 | 0.0959215 |
V7 | 0.239599 | -0.078803 | 0.791461 | 0.237609 | 0.592941 |
V8 | 0.0986979 | 0.0851017 | 0.247676 | 0.377436 | -0.270533 |
V9 | 0.363787 | -0.255425 | -1.51465 | -1.38702 | 0.817739 |
V10 | 0.0907942 | -0.166974 | 0.207643 | -0.0549519 | 0.753074 |
V11 | -0.5516 | 1.61273 | 0.624501 | -0.226487 | -0.822843 |
V12 | -0.617801 | 1.06524 | 0.0660837 | 0.178228 | 0.538196 |
V13 | -0.99139 | 0.489095 | 0.717293 | 0.507757 | 1.34585 |
V14 | -0.311169 | -0.143772 | -0.165946 | -0.287924 | -1.11967 |
V15 | 1.46818 | 0.635558 | 2.34586 | -0.631418 | 0.175121 |
V16 | -0.470401 | 0.463917 | -2.89008 | -1.05965 | -0.451449 |
V17 | 0.207971 | -0.114805 | 1.10997 | -0.684093 | -0.237033 |
V18 | 0.0257906 | -0.183361 | -0.121359 | 1.96578 | -0.0381948 |
V19 | 0.403993 | -0.145783 | -2.26186 | -1.23262 | 0.803487 |
V20 | 0.251412 | -0.0690831 | 0.52498 | -0.208038 | 0.408542 |
V21 | -0.0183068 | -0.225775 | 0.247998 | -0.1083 | -0.0094307 |
V22 | 0.277838 | -0.638672 | 0.771679 | 0.0052736 | 0.798278 |
V23 | -0.110474 | 0.101288 | 0.909412 | -0.190321 | -0.137458 |
V24 | 0.0669281 | -0.339846 | -0.689281 | -1.17558 | 0.141267 |
V25 | 0.128539 | 0.16717 | -0.327642 | 0.647376 | -0.20601 |
V26 | -0.189115 | 0.125895 | -0.139097 | -0.221929 | 0.502292 |
V27 | 0.133558 | -0.0089831 | -0.0553528 | 0.0627228 | 0.219422 |
V28 | -0.0210531 | 0.0147242 | -0.0597518 | 0.0614576 | 0.215153 |
Amount | 149.62 | 2.69 | 378.66 | 123.5 | 69.99 |
Class | 0 | 0 | 0 | 0 | 0 |
df.printSchema()
df.groupBy("Class").count().show()
As seen in the above counts for each class value, the dataset is heavily imbalanced. One approach that we can use is to assign different weights to classes in our binary classifier.
feature_cols = ["V" + str(i) for i in range(1,29)] + ["Amount"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
stages = [assembler]
I derived below params while training LightGBM model for this dataset in Python enviornment using Tree of Parzen Estimators algorithm implemented in Hyperopt library. Using them right now to get started quickly with great performance on our test outcome. It's also possible to do model tuning inside Spark using ParamGridBuilder and CrossValidator, although it will take more time to explore the hyperparameter space without using Bayesian Optimization or Tree of Parzen's Estimator algorithm to quickly find optimal parameters.
best_params = {
'bagging_fraction': 0.8,
'bagging_freq': 1,
'eval_metric': 'binary_error',
'feature_fraction': 0.944714847210862,
'lambda_l1': 1.0,
'lambda_l2': 45.0,
'learning_rate': 0.1,
'loss_function': 'binary_error',
'max_bin': 60,
'max_depth': 58,
'metric': 'binary_error',
'num_iterations': 379,
'num_leaves': 850,
'objective': 'binary',
'random_state': 7,
'verbose': None}
lgb = LightGBMClassifier(learningRate=0.1,
earlyStoppingRound=100,
featuresCol='features',
labelCol='Class',
isUnbalance=True,
baggingFraction=best_params["bagging_fraction"],
baggingFreq=1,
featureFraction=best_params["feature_fraction"],
lambdaL1=best_params["lambda_l1"],
lambdaL2=best_params["lambda_l2"],
maxBin=best_params["max_bin"],
maxDepth=best_params["max_depth"],
numIterations=best_params["num_iterations"],
numLeaves=best_params["num_leaves"],
objective="binary",
baggingSeed=7
)
stages += [lgb]
pipelineModel = Pipeline(stages=stages)
df.printSchema()
Split data into training and test datasets.
train, test = df.randomSplit([0.8, 0.2], seed=7)
train.count()
test.count()
Fitting the model using training data.
model = pipelineModel.fit(train)
Making predictions on test data.
preds = model.transform(test)
preds.select('Class', 'prediction', 'probability').show(10)
Evaluating predictions
binaryEvaluator = BinaryClassificationEvaluator(labelCol="Class")
print ("Test Area Under ROC: " + str(binaryEvaluator.evaluate(preds, {binaryEvaluator.metricName: "areaUnderROC"})))
tp = preds[(preds.Class == 1) & (preds.prediction == 1)].count()
tn = preds[(preds.Class == 0) & (preds.prediction == 0)].count()
fp = preds[(preds.Class == 0) & (preds.prediction == 1)].count()
fn = preds[(preds.Class == 1) & (preds.prediction == 0)].count()
print ("True Positives:", tp)
print ("True Negatives:", tn)
print ("False Positives:", fp)
print ("False Negatives:", fn)
print ("Total", preds.count())
r = float(tp)/(tp + fn)
print ("recall", r)
p = float(tp) / (tp + fp)
print ("precision", p)
f1 = 2 * p * r /(p + r)
print ("f1", f1)
Directions to improve on F1 score and AUC ROC: