# The Flint Time Series Library¶

Flint is a time series library for Apache Spark. The ability to analyze time series data at scale is critical for the success of finance and IoT applications based on Spark. Flint is Two Sigma's implementation of highly optimized time series operations in Spark. It performs truly parallel and rich analyses on time series data by taking advantage of the natural ordering in time series data to provide locality-based optimizations.

Flint is an open source library for Spark based around the TimeSeriesRDD, a time series aware data structure, and a collection of time series utility and analysis functions that use TimeSeriesRDDs. Unlike DataFrame and Dataset, Flint's TimeSeriesRDDs can leverage the existing ordering properties of datasets at rest and the fact that almost all data manipulations and analysis over these datasets respect their temporal ordering properties. It differs from other time series efforts in Spark in its ability to efficiently compute across panel data or on large scale high frequency data.

This example uses prices.csv file from Kaggle. For it to work you need to get it and put it in /tmp/prices.csv.

In [ ]:
%%classpath add mvn
org.apache.spark spark-sql_2.12 2.4.4
org.apache.spark spark-mllib_2.12 2.4.4
com.github.twosigma flint 6055a7a231

In [ ]:
val begin = "20150101"
val end   = "20160101"

In [ ]:
%%spark -s

In [ ]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import com.twosigma.flint.timeseries.Windows
import com.twosigma.flint.timeseries.Summarizers
import scala.concurrent.duration._
//import com.twosigma.flint.timeseries.implicits._
import com.twosigma.flint.timeseries._


In [ ]:
import com.twosigma.flint.timeseries.CSV

//Creates a TimeSeriesRDD from a CSV file
var pricesRdd = CSV.from(
spark.sqlContext,
"file:///tmp/prices.csv",
timeColumnName = "date",
dateFormat = "dd/MM/yyyy HH:mm",
sorted = false
)
pricesRdd

In [ ]:
pricesRdd.display(5)


## Basic Operations¶

In [ ]:
val priceAsInteger = pricesRdd.cast("close" -> IntegerType)
preview(priceAsInteger)

In [ ]:
val filteredRowsByPrice = pricesRdd.keepRows { row: Row => row.getAs[Double]("low") > 4.0 }
preview(filteredRowsByPrice)

In [ ]:
val timeColumnOnly = pricesRdd.keepColumns("time")
preview(timeColumnOnly)

In [ ]:
val withoutIdColumn = pricesRdd.deleteColumns("symbol")
preview(withoutIdColumn)

In [ ]:
val renamedColumns = pricesRdd.renameColumns("symbol" -> "ticker", "low" -> "lowPrice", "open" -> "openPrice", "close" -> "closePrice", "high" -> "highPrice")
preview(renamedColumns)


## Basic arithmetic on each row¶

In [ ]:
// Calculate logarithm of a column
val logVolumeRdd = pricesRdd.addColumns("logVolume" -> DoubleType -> { row => scala.math.log(row.getAs[Double]("volume")) })
preview(pricesRdd)

In [ ]:
// Raise a column to an exponent
val squaredVolumeRdd = pricesRdd.addColumns("squaredVolume" -> DoubleType -> { row => scala.math.pow(row.getAs[Double]("volume"), 2) })
preview(squaredVolumeRdd)

In [ ]:
// Calculate difference between two columns
val priceChangeRdd = pricesRdd.addColumns("priceChange" -> DoubleType -> { row =>
row.getAs[Double]("close") - row.getAs[Double]("open")
})
preview(priceChangeRdd)

In [ ]:
val pricePercentChange = pricesRdd.addColumns("pricePercentChange" -> DoubleType -> { row =>
val openPrice = row.getAs[Double]("open")
val closePrice = row.getAs[Double]("close")
if (openPrice != 0) (closePrice - openPrice) / openPrice else null
})
preview(pricePercentChange)


## Filtering¶

In [ ]:
// Select rows where the price went up
val priceIncreasedRdd = pricesRdd.keepRows { row =>
row.getAs[Double]("close") > row.getAs[Double]("open")
}
preview(priceIncreasedRdd)

In [ ]:
// The keepRows and deleteRows functions take a function from Row to Boolean as a filtering criteria.
// Only get rows whose symbol starts with 'A'
val startsWithARdd = pricesRdd.keepRows { row =>
val symbol = row.getAs[String]("symbol")
symbol != null && symbol.startsWith("A")
}
preview(startsWithARdd)

In [ ]:
//Remove all rows whose volumn is less than 2000000
val lowVolumeRdd  = pricesRdd.keepRows { row =>
row.getAs[Double]("volume") < 2000000
}
preview(lowVolumeRdd)


## Using history with window¶

In [ ]:
// Moving average over the last two weeks
val ibmPricesRdd = pricesRdd.keepRows { row =>
row.getAs[String]("symbol") == "IBM"
}
windowedIbmPricesRdd = windowedIbmPricesRdd.addColumns("movingAverage" -> DoubleType -> { row =>
val pastRows = row.getAs[Seq[Row]]("window_past_14days")
pastRows.map(_.getAs[Double]("close")).sum / pastRows.size
})
preview(windowedIbmPricesRdd)

In [ ]:
// Moving average over the last two weeks for all symbols
pastWindowRdd = pastWindowRdd.addColumns("movingAverage" -> DoubleType -> { row =>
val pastRows = row.getAs[Seq[Row]]("window_past_14days")
pastRows.map(_.getAs[Double]("close")).sum / pastRows.size
})
preview(pastWindowRdd)


## Calculating values for a cycle¶

In [ ]:
// addColumnsForCycle takes a closure that is applied to a list of rows and returns a map from row to result. The list contains all rows that share a timestamp.

// Add a column containing the number of instruments in the universe on each day
val cycleRdd = pricesRdd.addColumnsForCycle("universeSize" -> IntegerType -> { rows: Seq[Row] =>
rows.map { row => row -> rows.size }.toMap
})
preview(cycleRdd)

In [ ]:
// Compute the Z score across an interval
val zScoreRdd = pricesRdd.addColumnsForCycle("volumeZScore" -> DoubleType -> { rows: Seq[Row] =>
val mean = rows.map(_.getAs[Double]("volume")).sum / rows.size
val stddev = scala.math.sqrt(rows.map { row =>
scala.math.pow(row.getAs[Double]("close") - mean, 2)
}.sum ) / (rows.size - 1)
rows.map { row =>
row -> (row.getAs[Double]("close") - mean) / stddev
}.toMap
})
preview(zScoreRdd)

In [ ]:
// Add a column with rankings with the same timestamp
import org.apache.commons.math3.stat.ranking.NaturalRanking

val rankedRdd = pricesRdd.addColumnsForCycle("r" -> DoubleType -> { rows: Seq[Row] =>
val ranking = new NaturalRanking()
val ranks = ranking.rank(rows.map(_.getAs[Double]("volume")).toArray)
(rows zip ranks).toMap
})
preview(rankedRdd)


## Intervalizing¶

In [ ]:
// Volume weighted average price for every 7 days for IBM
val clock = Clocks.uniform(sc, "7d")
var ibmPricesRdd = pricesRdd.keepRows { row =>
row.getAs[String]("symbol") == "IBM"
}
var volumeWeightedRdd = ibmPricesRdd.groupByInterval(clock).addColumns("volumeWeightedPrice" -> DoubleType -> { row =>
val rows = row.getAs[Seq[Row]]("rows")
val weightedSum = rows.map { row =>
(row.getAs[Double]("open") + row.getAs[Double]("close")) / 2 * row.getAs[Double]("volume")
}.sum
weightedSum / rows.map (_.getAs[Double]("volume")).sum
}).deleteColumns("rows")
preview(volumeWeightedRdd)


## Aggregating¶

In [ ]:
// Average daily volume
val volumeRdd = pricesRdd.summarize(Summarizers.nthMoment("volume", 1), Seq("symbol"))
preview(volumeRdd)


## Regression with Open Source Package¶

In [ ]:
//stat.regression
import breeze.linalg.DenseVector
import org.apache.spark.mllib.random.RandomRDDs
import com.twosigma.flint.math.stats.regression.WeightedLabeledPoint
import com.twosigma.flint.math.stats.regression.OLSMultipleLinearRegression

// Generate a random data set from a linear model with beta = [1.0, 2.0] and intercept = 3.0
val data = WeightedLabeledPoint.generateSampleData(spark.sparkContext, DenseVector(1.0, 2.0), 3.0)

// Fit the data using the OLS linear regression.
val model = OLSMultipleLinearRegression.regression(data)

// Retrieve the estimate beta and intercept.
val denseVector = model.estimateRegressionParameters

Map(denseVector.activeIterator.toSeq.map { m => m._1 -> m._2} : _*)