import org.apache.spark.rdd._ import scala.collection.JavaConverters._ import au.com.bytecode.opencsv.CSVReader import java.io._ import org.joda.time._ import org.joda.time.format._ case class DelayRec(year: String, month: String, dayOfMonth: String, dayOfWeek: String, crsDepTime: String, depDelay: String, origin: String, distance: String, cancelled: String) { val holidays = List("01/01/2007", "01/15/2007", "02/19/2007", "05/28/2007", "06/07/2007", "07/04/2007", "09/03/2007", "10/08/2007" ,"11/11/2007", "11/22/2007", "12/25/2007", "01/01/2008", "01/21/2008", "02/18/2008", "05/22/2008", "05/26/2008", "07/04/2008", "09/01/2008", "10/13/2008" ,"11/11/2008", "11/27/2008", "12/25/2008") def gen_features: (String, Array[Double]) = { val values = Array( depDelay.toDouble, month.toDouble, dayOfMonth.toDouble, dayOfWeek.toDouble, get_hour(crsDepTime).toDouble, distance.toDouble, days_from_nearest_holiday(year.toInt, month.toInt, dayOfMonth.toInt) ) new Tuple2(to_date(year.toInt, month.toInt, dayOfMonth.toInt), values) } def get_hour(depTime: String) : String = "%04d".format(depTime.toInt).take(2) def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day) def days_from_nearest_holiday(year:Int, month:Int, day:Int): Int = { val sampleDate = new DateTime(year, month, day, 0, 0) holidays.foldLeft(3000) { (r, c) => val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c) val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays) math.min(r, distance) } } } // function to do a preprocessing step for a given file def prepFlightDelays(infile: String): RDD[DelayRec] = { val data = sc.textFile(infile) data.map { line => val reader = new CSVReader(new StringReader(line)) reader.readAll().asScala.toList.map(rec => DelayRec(rec(0),rec(1),rec(2),rec(3),rec(5),rec(15),rec(16),rec(18),rec(21))) }.map(list => list(0)) .filter(rec => rec.year != "Year") .filter(rec => rec.cancelled == "0") .filter(rec => rec.origin == "ORD") } val data_2007 = prepFlightDelays("airline/delay/2007.csv").map(rec => rec.gen_features._2) val data_2008 = prepFlightDelays("airline/delay/2008.csv").map(rec => rec.gen_features._2) data_2007.take(5).map(x => x mkString ",").foreach(println) import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.feature.StandardScaler def parseData(vals: Array[Double]): LabeledPoint = { LabeledPoint(if (vals(0)>=15) 1.0 else 0.0, Vectors.dense(vals.drop(1))) } // Prepare training set val parsedTrainData = data_2007.map(parseData) parsedTrainData.cache val scaler = new StandardScaler(withMean = true, withStd = true).fit(parsedTrainData.map(x => x.features)) val scaledTrainData = parsedTrainData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray)))) scaledTrainData.cache // Prepare test/validation set val parsedTestData = data_2008.map(parseData) parsedTestData.cache val scaledTestData = parsedTestData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray)))) scaledTestData.cache scaledTrainData.take(3).map(x => (x.label, x.features)).foreach(println) // Function to compute evaluation metrics def eval_metrics(labelsAndPreds: RDD[(Double, Double)]) : Tuple2[Array[Double], Array[Double]] = { val tp = labelsAndPreds.filter(r => r._1==1 && r._2==1).count.toDouble val tn = labelsAndPreds.filter(r => r._1==0 && r._2==0).count.toDouble val fp = labelsAndPreds.filter(r => r._1==1 && r._2==0).count.toDouble val fn = labelsAndPreds.filter(r => r._1==0 && r._2==1).count.toDouble val precision = tp / (tp+fp) val recall = tp / (tp+fn) val F_measure = 2*precision*recall / (precision+recall) val accuracy = (tp+tn) / (tp+tn+fp+fn) new Tuple2(Array(tp, tn, fp, fn), Array(precision, recall, F_measure, accuracy)) } import org.apache.spark.mllib.classification.LogisticRegressionWithSGD // Build the Logistic Regression model val model_lr = LogisticRegressionWithSGD.train(scaledTrainData, numIterations=100) // Predict val labelsAndPreds_lr = scaledTestData.map { point => val pred = model_lr.predict(point.features) (pred, point.label) } val m_lr = eval_metrics(labelsAndPreds_lr)._2 println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_lr(0), m_lr(1), m_lr(2), m_lr(3))) import org.apache.spark.mllib.classification.SVMWithSGD // Build the SVM model val svmAlg = new SVMWithSGD() svmAlg.optimizer.setNumIterations(100) .setRegParam(1.0) .setStepSize(1.0) val model_svm = svmAlg.run(scaledTrainData) // Predict val labelsAndPreds_svm = scaledTestData.map { point => val pred = model_svm.predict(point.features) (pred, point.label) } val m_svm = eval_metrics(labelsAndPreds_svm)._2 println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_svm(0), m_svm(1), m_svm(2), m_svm(3))) import org.apache.spark.mllib.tree.DecisionTree // Build the Decision Tree model val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val impurity = "gini" val maxDepth = 10 val maxBins = 100 val model_dt = DecisionTree.trainClassifier(parsedTrainData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins) // Predict val labelsAndPreds_dt = parsedTestData.map { point => val pred = model_dt.predict(point.features) (pred, point.label) } val m_dt = eval_metrics(labelsAndPreds_dt)._2 println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_dt(0), m_dt(1), m_dt(2), m_dt(3))) import org.apache.spark.SparkContext._ import scala.collection.JavaConverters._ import au.com.bytecode.opencsv.CSVReader import java.io._ // function to do a preprocessing step for a given file def preprocess_spark(delay_file: String, weather_file: String): RDD[Array[Double]] = { // Read wether data val delayRecs = prepFlightDelays(delay_file).map{ rec => val features = rec.gen_features (features._1, features._2) } // Read weather data into RDDs val station_inx = 0 val date_inx = 1 val metric_inx = 2 val value_inx = 3 def filterMap(wdata:RDD[Array[String]], metric:String):RDD[(String,Double)] = { wdata.filter(vals => vals(metric_inx) == metric).map(vals => (vals(date_inx), vals(value_inx).toDouble)) } val wdata = sc.textFile(weather_file).map(line => line.split(",")) .filter(vals => vals(station_inx) == "USW00094846") val w_tmin = filterMap(wdata,"TMIN") val w_tmax = filterMap(wdata,"TMAX") val w_prcp = filterMap(wdata,"PRCP") val w_snow = filterMap(wdata,"SNOW") val w_awnd = filterMap(wdata,"AWND") delayRecs.join(w_tmin).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2))) .join(w_tmax).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2))) .join(w_prcp).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2))) .join(w_snow).map(vals => (vals._1, vals._2._1 ++ Array(vals._2._2))) .join(w_awnd).map(vals => vals._2._1 ++ Array(vals._2._2)) } val data_2007 = preprocess_spark("airline/delay/2007.csv", "airline/weather/2007.csv") val data_2008 = preprocess_spark("airline/delay/2008.csv", "airline/weather/2008.csv") data_2007.take(5).map(x => x mkString ",").foreach(println) import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.feature.StandardScaler def parseData(vals: Array[Double]): LabeledPoint = { LabeledPoint(if (vals(0)>=15) 1.0 else 0.0, Vectors.dense(vals.drop(1))) } // Prepare training set val parsedTrainData = data_2007.map(parseData) val scaler = new StandardScaler(withMean = true, withStd = true).fit(parsedTrainData.map(x => x.features)) val scaledTrainData = parsedTrainData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray)))) parsedTrainData.cache scaledTrainData.cache // Prepare test/validation set val parsedTestData = data_2008.map(parseData) val scaledTestData = parsedTestData.map(x => LabeledPoint(x.label, scaler.transform(Vectors.dense(x.features.toArray)))) parsedTestData.cache scaledTestData.cache scaledTrainData.take(3).map(x => (x.label, x.features)).foreach(println) import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.optimization.L1Updater // Build the SVM model val svmAlg = new SVMWithSGD() svmAlg.optimizer.setNumIterations(100) .setRegParam(1.0) .setStepSize(1.0) val model_svm = svmAlg.run(scaledTrainData) // Predict val labelsAndPreds_svm = scaledTestData.map { point => val pred = model_svm.predict(point.features) (pred, point.label) } val m_svm = eval_metrics(labelsAndPreds_svm)._2 println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_svm(0), m_svm(1), m_svm(2), m_svm(3))) import org.apache.spark.mllib.tree.DecisionTree // Build the Decision Tree model val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val impurity = "gini" val maxDepth = 10 val maxBins = 100 val model_dt = DecisionTree.trainClassifier(scaledTrainData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins) // Predict val labelsAndPreds_dt = scaledTestData.map { point => val pred = model_dt.predict(point.features) (pred, point.label) } val m_dt = eval_metrics(labelsAndPreds_dt)._2 println("precision = %.2f, recall = %.2f, F1 = %.2f, accuracy = %.2f".format(m_dt(0), m_dt(1), m_dt(2), m_dt(3)))