%%classpath add mvn org.apache.spark spark-sql_2.11 2.2.1 org.apache.spark spark-mllib_2.11 2.2.1 com.github.twosigma flint 6055a7a231 val begin = "20150101" val end = "20160101" %%spark -s import org.apache.spark.sql.types._ import org.apache.spark.sql.Row //import com.twosigma.flint.timeseries.io.read import com.twosigma.flint.timeseries.Windows import com.twosigma.flint.timeseries.Summarizers import scala.concurrent.duration._ //import com.twosigma.flint.timeseries.implicits._ import com.twosigma.flint.timeseries._ import com.twosigma.flint.timeseries.CSV //Load prices.csv from https://www.kaggle.com/dgawlik/nyse //Creates a TimeSeriesRDD from a CSV file var pricesRdd = CSV.from( spark.sqlContext, "file:///tmp/prices.csv", header = true, timeColumnName = "date", dateFormat = "dd/MM/yyyy HH:mm", sorted = false ) pricesRdd pricesRdd.display(5) val priceAsInteger = pricesRdd.cast("close" -> IntegerType) preview(priceAsInteger) val filteredRowsByPrice = pricesRdd.keepRows { row: Row => row.getAs[Double]("low") > 4.0 } preview(filteredRowsByPrice) val timeColumnOnly = pricesRdd.keepColumns("time") preview(timeColumnOnly) val withoutIdColumn = pricesRdd.deleteColumns("symbol") preview(withoutIdColumn) val renamedColumns = pricesRdd.renameColumns("symbol" -> "ticker", "low" -> "lowPrice", "open" -> "openPrice", "close" -> "closePrice", "high" -> "highPrice") preview(renamedColumns) // Calculate logarithm of a column val logVolumeRdd = pricesRdd.addColumns("logVolume" -> DoubleType -> { row => scala.math.log(row.getAs[Double]("volume")) }) preview(pricesRdd) // Raise a column to an exponent val squaredVolumeRdd = pricesRdd.addColumns("squaredVolume" -> DoubleType -> { row => scala.math.pow(row.getAs[Double]("volume"), 2) }) preview(squaredVolumeRdd) // Calculate difference between two columns val priceChangeRdd = pricesRdd.addColumns("priceChange" -> DoubleType -> { row => row.getAs[Double]("close") - row.getAs[Double]("open") }) preview(priceChangeRdd) val pricePercentChange = pricesRdd.addColumns("pricePercentChange" -> DoubleType -> { row => val openPrice = row.getAs[Double]("open") val closePrice = row.getAs[Double]("close") if (openPrice != 0) (closePrice - openPrice) / openPrice else null }) preview(pricePercentChange) // Select rows where the price went up val priceIncreasedRdd = pricesRdd.keepRows { row => row.getAs[Double]("close") > row.getAs[Double]("open") } preview(priceIncreasedRdd) // The keepRows and deleteRows functions take a function from Row to Boolean as a filtering criteria. // Only get rows whose symbol starts with 'A' val startsWithARdd = pricesRdd.keepRows { row => val symbol = row.getAs[String]("symbol") symbol != null && symbol.startsWith("A") } preview(startsWithARdd) //Remove all rows whose volumn is less than 2000000 val lowVolumeRdd = pricesRdd.keepRows { row => row.getAs[Double]("volume") < 2000000 } preview(lowVolumeRdd) // Moving average over the last two weeks val ibmPricesRdd = pricesRdd.keepRows { row => row.getAs[String]("symbol") == "IBM" } var windowedIbmPricesRdd = ibmPricesRdd.addWindows(Windows.pastAbsoluteTime("14days")) windowedIbmPricesRdd = windowedIbmPricesRdd.addColumns("movingAverage" -> DoubleType -> { row => val pastRows = row.getAs[Seq[Row]]("window_past_14days") pastRows.map(_.getAs[Double]("close")).sum / pastRows.size }) preview(windowedIbmPricesRdd) // Moving average over the last two weeks for all symbols var pastWindowRdd = pricesRdd.addWindows(Windows.pastAbsoluteTime("14days"), Seq("symbol")) pastWindowRdd = pastWindowRdd.addColumns("movingAverage" -> DoubleType -> { row => val pastRows = row.getAs[Seq[Row]]("window_past_14days") pastRows.map(_.getAs[Double]("close")).sum / pastRows.size }) preview(pastWindowRdd) // addColumnsForCycle takes a closure that is applied to a list of rows and returns a map from row to result. The list contains all rows that share a timestamp. // Add a column containing the number of instruments in the universe on each day val cycleRdd = pricesRdd.addColumnsForCycle("universeSize" -> IntegerType -> { rows: Seq[Row] => rows.map { row => row -> rows.size }.toMap }) preview(cycleRdd) // Compute the Z score across an interval val zScoreRdd = pricesRdd.addColumnsForCycle("volumeZScore" -> DoubleType -> { rows: Seq[Row] => val mean = rows.map(_.getAs[Double]("volume")).sum / rows.size val stddev = scala.math.sqrt(rows.map { row => scala.math.pow(row.getAs[Double]("close") - mean, 2) }.sum ) / (rows.size - 1) rows.map { row => row -> (row.getAs[Double]("close") - mean) / stddev }.toMap }) preview(zScoreRdd) // Add a column with rankings with the same timestamp import org.apache.commons.math3.stat.ranking.NaturalRanking val rankedRdd = pricesRdd.addColumnsForCycle("r" -> DoubleType -> { rows: Seq[Row] => val ranking = new NaturalRanking() val ranks = ranking.rank(rows.map(_.getAs[Double]("volume")).toArray) (rows zip ranks).toMap }) preview(rankedRdd) // Volume weighted average price for every 7 days for IBM val clock = Clocks.uniform(sc, "7d") var ibmPricesRdd = pricesRdd.keepRows { row => row.getAs[String]("symbol") == "IBM" } var volumeWeightedRdd = ibmPricesRdd.groupByInterval(clock).addColumns("volumeWeightedPrice" -> DoubleType -> { row => val rows = row.getAs[Seq[Row]]("rows") val weightedSum = rows.map { row => (row.getAs[Double]("open") + row.getAs[Double]("close")) / 2 * row.getAs[Double]("volume") }.sum weightedSum / rows.map (_.getAs[Double]("volume")).sum }).deleteColumns("rows") preview(volumeWeightedRdd) // Average daily volume val volumeRdd = pricesRdd.summarize(Summarizers.nthMoment("volume", 1), Seq("symbol")) preview(volumeRdd) //stat.regression import breeze.linalg.DenseVector import org.apache.spark.mllib.random.RandomRDDs import com.twosigma.flint.math.stats.regression.WeightedLabeledPoint import com.twosigma.flint.math.stats.regression.OLSMultipleLinearRegression // Generate a random data set from a linear model with beta = [1.0, 2.0] and intercept = 3.0 val data = WeightedLabeledPoint.generateSampleData(spark.sparkContext, DenseVector(1.0, 2.0), 3.0) // Fit the data using the OLS linear regression. val model = OLSMultipleLinearRegression.regression(data) // Retrieve the estimate beta and intercept. val denseVector = model.estimateRegressionParameters Map(denseVector.activeIterator.toSeq.map { m => m._1 -> m._2} : _*)