%%capture %load_ext rpy2.ipython %%capture %%R # load required R packages require(rhdfs) require(randomForest) require(gbm) require(plyr) require(data.table) # Initialize RHDFS package hdfs.init(hadoop='/usr/bin/hadoop') # Utility function to read a multi-part file from HDFS into an R data frame read_csv_from_hdfs <- function(filename, cols=NULL) { dir.list = hdfs.ls(filename) list.condition <- sapply(dir.list$size, function(x) x > 0) file.list <- dir.list[list.condition,] tables <- lapply(file.list$file, function(f) { content <- paste(hdfs.read.text.file(f, n = 100000L, buffer=100000000L), collapse='\n') if (length(cols)==0) { dt = fread(content, sep=",", colClasses="character", stringsAsFactors=F, header=T) } else { dt = fread(content, sep=",", colClasses="character", stringsAsFactors=F, header=F) setnames(dt, names(dt), cols) } dt }) rbind.fill(tables) } %%R cols = c('year', 'month', 'day', 'dow', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime','Carrier', 'FlightNum', 'TailNum', 'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'CancellationCode', 'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay'); flt_2007 = read_csv_from_hdfs('/user/demo/airline/delay/2007.csv', cols) print(dim(flt_2007)) %%R df1 = flt_2007[which(flt_2007$Origin == 'ORD' & !is.na(flt_2007$DepDelay)),] df1$DepDelay = sapply(df1$DepDelay, function(x) (if (as.numeric(x)>=15) 1 else 0)) print(paste0("total flights: ", as.character(dim(df1)[1]))) print(paste0("total delays: ", as.character(sum(df1$DepDelay)))) %%R df2 = df1[, c('DepDelay', 'month'), with=F] df2$month = as.numeric(df2$month) df2 <- ddply(df2, .(month), summarise, mean_delay=mean(DepDelay)) barplot(df2$mean_delay, names.arg=df2$month, xlab="month", ylab="% of delays", col="blue") %%R # Extract hour of day from 3 or 4 digit time-of-day string get_hour <- function(x) { s = sprintf("%04d", as.numeric(x)) return(substr(s, 1, 2)) } df2 = df1[, c('DepDelay', 'CRSDepTime'), with=F] df2$hour = as.numeric(sapply(df2$CRSDepTime, get_hour)) df2$CRSDepTime <- NULL df2 <- ddply(df2, .(hour), summarise, mean_delay=mean(DepDelay)) barplot(df2$mean_delay, names.arg=df2$hour, xlab="hour of day", ylab="% of delays", col="green") %%writefile preprocess1.scala package com.hortonworks.datascience.demo1 import com.twitter.scalding._ import org.joda.time.format._ import org.joda.time.{Days, DateTime} import com.hortonworks.datascience.demo1.ScaldingFlightDelays._ /** * Pre-process flight delay data into feature matrix - iteration #1 */ class ScaldingFlightDelays(args: Args) extends Job(args) { val prepData = Csv(args("input"), ",", fields = inputSchema, skipHeader = true) .read .project(delaySchmea) .filter(('Origin,'Cancelled)) { x:(String,String) => x._1 == "ORD" && x._2 == "0"} .mapTo(featureSchmea -> outputSchema)(gen_features) .write(Csv(args("output"))) } object ScaldingFlightDelays { val inputSchema = List('Year, 'Month, 'DayofMonth, 'DayOfWeek, 'DepTime, 'CRSDepTime, 'ArrTime, 'CRSArrTime, 'UniqueCarrier, 'FlightNum, 'TailNum, 'ActualElapsedTime, 'CRSElapsedTime, 'AirTime, 'ArrDelay, 'DepDelay, 'Origin, 'Dest, 'Distance, 'TaxiIn, 'TaxiOut, 'Cancelled, 'CancellationCode, 'Diverted, 'CarrierDelay, 'WeatherDelay, 'NASDelay, 'SecurityDelay, 'LateAircraftDelay) val delaySchmea = List('Year, 'Month, 'DayofMonth, 'DayOfWeek, 'CRSDepTime, 'DepDelay, 'Origin, 'Distance, 'Cancelled) val featureSchmea = List('Year, 'Month, 'DayofMonth, 'DayOfWeek, 'CRSDepTime, 'DepDelay, 'Distance) val outputSchema = List('flightDate,'y,'m,'dm,'dw,'crs,'dep,'dist) val holidays = List("01/01/2007", "01/15/2007", "02/19/2007", "05/28/2007", "06/07/2007", "07/04/2007", "09/03/2007", "10/08/2007" ,"11/11/2007", "11/22/2007", "12/25/2007", "01/01/2008", "01/21/2008", "02/18/2008", "05/22/2008", "05/26/2008", "07/04/2008", "09/01/2008", "10/13/2008" ,"11/11/2008", "11/27/2008", "12/25/2008") def gen_features(tuple: (String,String,String,String,String,String,String)) = { val (year, month, dayOfMonth, dayOfWeek, crsDepTime, depDelay, distance) = tuple val date = to_date(year.toInt,month.toInt,dayOfMonth.toInt) val hour = get_hour(crsDepTime) val holidayDist = days_from_nearest_holiday(year.toInt,month.toInt,dayOfMonth.toInt) (date,depDelay,month,dayOfMonth,dayOfWeek,hour,distance,holidayDist.toString) } def get_hour(depTime: String) = "%04d".format(depTime.toInt).take(2) def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day) def days_from_nearest_holiday(year:Int, month:Int, day:Int) = { val sampleDate = new DateTime(year, month, day, 0, 0) holidays.foldLeft(3000) { (r, c) => val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c) val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays) math.min(r, distance) } } } %%capture capt2007_1 !/home/demo/scalding/scripts/scald.rb --hdfs preprocess1.scala --input "airline/delay/2007.csv" --output "airline/fm/ord_2007_sc_1" capt2007_1.stderr %%capture capt2008_1 !/home/demo/scalding/scripts/scald.rb --hdfs preprocess1.scala --input "airline/delay/2008.csv" --output "airline/fm/ord_2008_sc_1" capt2008_1.stderr %%R # Function to compute Precision, Recall and F1-Measure get_metrics <- function(predicted, actual) { tp = length(which(predicted == TRUE & actual == TRUE)) tn = length(which(predicted == FALSE & actual == FALSE)) fp = length(which(predicted == TRUE & actual == FALSE)) fn = length(which(predicted == FALSE & actual == TRUE)) precision = tp / (tp+fp) recall = tp / (tp+fn) F1 = 2*precision*recall / (precision+recall) accuracy = (tp+tn) / (tp+tn+fp+fn) v = c(precision, recall, F1, accuracy) v } # Read input files process_dataset <- function(filename) { cols = c('date', 'delay', 'month', 'day', 'dow', 'hour', 'distance', 'days_from_holiday') data = read_csv_from_hdfs(filename, cols) data$delay = as.factor(as.numeric(data$delay) >= 15) data$month = as.factor(data$month) data$day = as.factor(data$day) data$dow = as.factor(data$dow) data$hour = as.numeric(data$hour) data$distance = as.numeric(data$distance) data$days_from_holiday = as.numeric(data$days_from_holiday) data } # Prepare training set and test/validation set data_2007 = process_dataset('/user/demo/airline/fm/ord_2007_sc_1') data_2008 = process_dataset('/user/demo/airline/fm/ord_2008_sc_1') fcols = setdiff(names(data_2007), c('date', 'delay')) train_x = data_2007[,fcols, with=FALSE] train_y = data_2007$delay test_x = data_2008[,fcols, with=FALSE] test_y = data_2008$delay %%R rf.model = randomForest(train_x, train_y, ntree=40) rf.pr <- predict(rf.model, newdata=test_x) m.rf = get_metrics(as.logical(rf.pr), as.logical(test_y)) print(sprintf("Random Forest: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f", m.rf[1], m.rf[2], m.rf[3], m.rf[4])) %%R gbm.model <- gbm.fit(train_x, as.numeric(train_y)-1, n.trees=500, verbose=F, shrinkage=0.01, distribution="bernoulli", interaction.depth=3, n.minobsinnode=30) gbm.pr <- predict(gbm.model, newdata=test_x, n.trees=500, type="response") m.gbm = get_metrics(gbm.pr >= 0.5, as.logical(test_y)) print(sprintf("Gradient Boosted Machines: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f", m.gbm[1], m.gbm[2], m.gbm[3], m.gbm[4])) %%writefile preprocess2.scala package com.hortonworks.datascience.demo2 import com.twitter.scalding._ import org.joda.time.format._ import org.joda.time.{Days, DateTime} import com.hortonworks.datascience.demo2.ScaldingFlightDelays._ /** * pre-process flight and weather data into feature matrix - iteration #2 */ class ScaldingFlightDelays(args: Args) extends Job(args) { val delayData = Csv(args("delay"), ",", fields = delayInSchema, skipHeader = true) .read .project(delaySchema) .filter(('Origin,'Cancelled)) { x:(String,String) => x._1 == "ORD" && x._2 == "0"} .mapTo(filterSchema -> featureSchmea)(gen_features) val weatherData = Csv(args("weather"),",", fields = weatherInSchema) .read .project(weatherSchema) .filter('Station){x:String => x == "USW00094846"} .filter('Metric){m:String => m == "TMIN" | m == "TMAX" | m == "PRCP" | m == "SNOW" | m == "AWND"} .mapTo(weatherSchema -> ('Date,'MM)){tuple:(String,String,String,String) => (tuple._2,tuple._3+":"+tuple._4)} .groupBy('Date){_.foldLeft('MM -> 'Measures)(Map[String,Double]()){ (m,s:String) => {val kv = s.split(":"); m + (kv(0) -> kv(1).toDouble)} } } delayData.joinWithSmaller(('flightDate,'Date),weatherData) .project('delay,'m,'dm,'dw,'h,'dist,'holiday,'Measures) .mapTo(joinSchema -> outputSchema){x:(Double,Double,Double,Double,Double,Double,Double,Map[String,Double]) => { (x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8("TMIN"), x._8("TMAX"), x._8("PRCP"), x._8("SNOW"), x._8("AWND")) } } .write(Csv(args("output"),",")) } object ScaldingFlightDelays { val delayInSchema = List('Year, 'Month, 'DayofMonth, 'DayOfWeek, 'DepTime, 'CRSDepTime, 'ArrTime, 'CRSArrTime, 'UniqueCarrier, 'FlightNum, 'TailNum, 'ActualElapsedTime, 'CRSElapsedTime, 'AirTime, 'ArrDelay, 'DepDelay, 'Origin, 'Dest, 'Distance, 'TaxiIn, 'TaxiOut, 'Cancelled, 'CancellationCode, 'Diverted, 'CarrierDelay, 'WeatherDelay, 'NASDelay, 'SecurityDelay, 'LateAircraftDelay) val weatherInSchema = List('Station, 'Date, 'Metric, 'Measure, 'v1, 'v2, 'v3, 'v4) val delaySchema = List('Year, 'Month, 'DayofMonth, 'DayOfWeek, 'CRSDepTime, 'DepDelay, 'Origin, 'Distance, 'Cancelled); val weatherSchema = List('Station, 'Date, 'Metric, 'Measure) val filterSchema = List('Year, 'Month, 'DayofMonth, 'DayOfWeek, 'CRSDepTime, 'DepDelay, 'Distance) val featureSchmea = List('flightDate,'delay,'m,'dm,'dw,'h,'dist,'holiday); val joinSchema = List('delay,'m,'dm,'dw,'h,'dist,'holiday,'Measures) val outputSchema = List('delay,'m,'dm,'dw,'h,'dist,'holiday,'tmin,'tmax,'prcp,'snow,'awnd) val holidays = List("01/01/2007", "01/15/2007", "02/19/2007", "05/28/2007", "06/07/2007", "07/04/2007", "09/03/2007", "10/08/2007" ,"11/11/2007", "11/22/2007", "12/25/2007", "01/01/2008", "01/21/2008", "02/18/2008", "05/22/2008", "05/26/2008", "07/04/2008", "09/01/2008", "10/13/2008" ,"11/11/2008", "11/27/2008", "12/25/2008") def gen_features(tuple: (String,String,String,String,String,String,String)) = { val (year, month,dayOfMonth,dayOfWeek:String, crsDepTime:String,depDelay:String,distance:String) = tuple val date = to_date(year.toInt,month.toInt,dayOfMonth.toInt) val hour = get_hour(crsDepTime) val holidayDist = days_from_nearest_holiday(year.toInt,month.toInt,dayOfMonth.toInt) (date,depDelay,month,dayOfMonth,dayOfWeek,hour,distance,holidayDist.toString) } def get_hour(depTime: String) = "%04d".format(depTime.toInt).take(2) def to_date(year: Int, month: Int, day: Int) = "%04d%02d%02d".format(year, month, day) def days_from_nearest_holiday(year:Int, month:Int, day:Int) = { val sampleDate = new DateTime(year, month, day, 0, 0) holidays.foldLeft(3000) { (r, c) => val holiday = DateTimeFormat.forPattern("MM/dd/yyyy").parseDateTime(c) val distance = Math.abs(Days.daysBetween(holiday, sampleDate).getDays) math.min(r, distance) } } } %%capture capt2007_2 !/home/demo/scalding/scripts/scald.rb --hdfs preprocess2.scala --delay "airline/delay/2007.csv" --weather "airline/weather/2007.csv" --output "airline/fm/ord_2007_sc_2" capt2007_2.stderr %%capture capt2008_2 !/home/demo/scalding/scripts/scald.rb --hdfs preprocess2.scala --delay "airline/delay/2008.csv" --weather "airline/weather/2008.csv" --output "airline/fm/ord_2008_sc_2" capt2008_2.stderr %%R # Read input files process_dataset <- function(filename) { cols = c('delay', 'month', 'day', 'dow', 'hour', 'distance', 'days_from_holiday', 'tmin', 'tmax', 'prcp', 'snow', 'awnd') data = read_csv_from_hdfs(filename, cols) data$delay = as.factor(as.numeric(data$delay) >= 15) data$month = as.factor(data$month) data$day = as.factor(data$day) data$dow = as.factor(data$dow) data$hour = as.numeric(data$hour) data$distance = as.numeric(data$distance) data$days_from_holiday = as.numeric(data$days_from_holiday) data$tmin = as.numeric(data$tmin) data$tmax = as.numeric(data$tmax) data$prcp = as.numeric(data$prcp) data$snow = as.numeric(data$snow) data$awnd = as.numeric(data$awnd) data } # Prepare training set and test/validation set data_2007 = process_dataset('/user/demo/airline/fm/ord_2007_sc_2') data_2008 = process_dataset('/user/demo/airline/fm/ord_2008_sc_2') fcols = setdiff(names(data_2007), c('delay')) train_x = data_2007[,fcols] train_y = data_2007$delay test_x = data_2008[,fcols] test_y = data_2008$delay %%R rf.model = randomForest(train_x, train_y, ntree=40) rf.pr <- predict(rf.model, newdata=test_x) m.rf = get_metrics(as.logical(rf.pr), as.logical(test_y)) print(sprintf("Random Forest: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f", m.rf[1], m.rf[2], m.rf[3], m.rf[4])) %%R gbm.model <- gbm.fit(train_x, as.numeric(train_y)-1, n.trees=500, verbose=F, shrinkage=0.01, distribution="bernoulli", interaction.depth=3, n.minobsinnode=30) gbm.pr <- predict(gbm.model, newdata=test_x, n.trees=500, type="response") m.gbm = get_metrics(gbm.pr >= 0.5, as.logical(test_y)) print(sprintf("Gradient Boosted Machines: precision=%0.2f, recall=%0.2f, F1=%0.2f, accuracy=%0.2f", m.gbm[1], m.gbm[2], m.gbm[3], m.gbm[4]))