In this demo we show how to forecast if the closing price of Apple, General Electric and Samsung Electronics is moving up or down. We do this with the help of a Random Forest Classifier. I tried to replicate the result from a research paper authored by Luckyson Khaidem, Snehanshu Saha, Sudeepa Roy Dey:
The solution has been implemented in Scala using Jupyter with the BeakerX kernel using the following libraries
We add the necessary java libraries with the help of Maven...
%classpath config resolver maven-public1 http://nuc.local:8081/repository/maven-public/
%%classpath add mvn
ch.pschatzmann:investor:LATEST
ch.pschatzmann:investor-dl4j:LATEST
org.nd4j:nd4j-native:1.0.0-beta2
org.apache.spark:spark-sql_2.11:2.3.2
org.apache.spark:spark-mllib_2.11:2.3.2
Added new repo: maven-public1
... and we import all relevant packages
import org.ta4j.core.Indicator
import org.ta4j.core.num.Num
import org.ta4j.core.indicators._
import org.ta4j.core.indicators.volume._
import org.ta4j.core.indicators.helpers._
import org.ta4j.core.num.Num
import ch.pschatzmann.stocks.forecasting._
import ch.pschatzmann.stocks.Context
import ch.pschatzmann.stocks.ta4j.indicator._
import ch.pschatzmann.stocks.integration.dl4j._
import ch.pschatzmann.stocks.integration._
import ch.pschatzmann.display.Table
import org.nd4j.linalg.dataset.api.iterator.DataSetIterator
import org.nd4j.linalg.dataset.api._
import scala.collection.mutable.ListBuffer
import scala.collection.Map
import org.ta4j.core.Indicator import org.ta4j.core.num.Num import org.ta4j.core.indicators._ import org.ta4j.core.indicators.volume._ import org.ta4j.core.indicators.helpers._ import org.ta4j.core.num.Num import ch.pschatzmann.stocks.forecasting._ import ch.pschatzmann.stocks.Context import ch.pschatzmann.stocks.ta4j.indicator._ import ch.pschatzmann.stocks.integration.dl4j._ import ch.pschatzmann.stocks.integration._ import ch.pschatzmann.display.Table import org.nd4j.linalg.dataset.api.iterator.DataSetIterator import org.nd4j.linalg.dataset.api._ import scala.collection.mutable.ListBuffer import scala.collection.Map
First we can use the StockTimeSeriesEMA for the exponential smoothing of the original input time series. We generate a plot with different smoothing periods:
import scala.collection.JavaConverters._
// Use exponentially smoothed time series
var timeSeries = Context.getStockData("AAPL").toTimeSeries()
var timeSeries0 = StockTimeSeriesEMA.create(timeSeries, 0)
var timeSeries5 = StockTimeSeriesEMA.create(timeSeries, 5)
var timeSeries10 = StockTimeSeriesEMA.create(timeSeries, 10)
var timeSeries20 = StockTimeSeriesEMA.create(timeSeries, 20)
var close = new ClosePriceIndicator(timeSeries)
var close0 = new NamedIndicator(new ClosePriceIndicator(timeSeries0),"close-0")
var close5 = new NamedIndicator(new ClosePriceIndicator(timeSeries5),"close-5")
var close10 = new NamedIndicator(new ClosePriceIndicator(timeSeries10),"close-10")
var close20 = new NamedIndicator(new ClosePriceIndicator(timeSeries20),"close-20")
var table = Table.create(close, close0,close5,close10,close20)
new SimpleTimePlot {
data = table.seq()
columns = Seq("ClosePriceIndicator","close-0","close-5","close-10","close-20")
showLegend = true
}
We can easily generate the described input input features and output labels with the help of ta4j Indicators and then we display the data as a table.
import scala.collection.JavaConverters._
var timeSeries = timeSeries10
// Relative Strength Index
var close = new ClosePriceIndicator(timeSeries)
var rsi = new RSIIndicator(close, 10)
// Stochastic Oscillator
var sk = new StochasticOscillatorKIndicator(timeSeries, 10 )
var sd = new StochasticOscillatorDIndicator(sk)
// Williams %R
var williamsR = new WilliamsRIndicator(timeSeries,10)
// Moving Average Convergence Divergence
var macd = new MACDIndicator(close)
// Price Rate of Change
var roc = new ROCIndicator(close, 10)
// On Balance Volume
var obv = new OnBalanceVolumeIndicator(timeSeries)
// Label
var offsetIndicator= new OffsetIndicator(close, +10)
var difference = new DifferenceIndicator(close, offsetIndicator)
var label = new SignIndicator(difference)
var in:List[org.ta4j.core.Indicator[org.ta4j.core.num.Num]] = List(rsi,sk,sd,williamsR,macd,roc,obv)
var out:List[org.ta4j.core.Indicator[org.ta4j.core.num.Num]] = List(label)
Table.create(rsi,sk,sd,williamsR,macd,roc,obv, label)
Now we can use a StockData2DIterator to write the result data to a CSV file
var os = new java.io.FileOutputStream("rf-stock.csv")
var iterator = new StockData2DIterator(in.asJava, out.asJava, 1, 1);
iterator.writeCSV(os)
os.close()
iterator.getRecordCount()
9571
We use Spark MLLib for the subsequent processing. So we need to have a Spark Session first.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Stocks RandomForestClassifier")
.master("local[*]")
.config("spark.ui.enabled", "false")
.getOrCreate()
org.apache.spark.sql.SparkSession@37b95ada
Then we load the CSV as Spark Dataset and print the schema
import spark.implicits._
val data = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("rf-stock.csv")
data.printSchema()
root |-- RSIIndicator: double (nullable = true) |-- StochasticOscillatorKIndicator: double (nullable = true) |-- StochasticOscillatorDIndicator: double (nullable = true) |-- WilliamsRIndicator: double (nullable = true) |-- MACDIndicator: double (nullable = true) |-- ROCIndicator: double (nullable = true) |-- OnBalanceVolumeIndicator: double (nullable = true) |-- SignIndicator: double (nullable = true)
org.apache.spark.sql.SparkSession$implicits$@2ee8feac
We determine the input fields
var outFieldName = "SignIndicator"
var inFieldsNames = data.schema.fields.map(f => f.name).filter(n => n!=outFieldName)
[RSIIndicator, StochasticOscillatorKIndicator, StochasticOscillatorDIndicator, WilliamsRIndicator, MACDIndicator, ROCIndicator, OnBalanceVolumeIndicator]
Next we split the data into training ad test data
val Array(trainingData, testData) = data.randomSplit(Array(0.8, 0.2), seed = 1234L)
[RSIIndicator: double, StochasticOscillatorKIndicator: double ... 6 more fields]
We define and fit our pipline on the training data. The we run forecasts on our test data
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.feature.VectorAssembler
// Build the Feature Vector
val vectorAssembler = new VectorAssembler()
.setInputCols(inFieldsNames)
.setOutputCol("features")
val labelIndexer = new StringIndexer()
.setInputCol("SignIndicator")
.setOutputCol("indexedLabel")
.fit(data)
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)
// Train a RandomForest model.
val classifier = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
// Chain indexers and forest in a Pipeline.
val pipeline = new Pipeline()
.setStages(Array(vectorAssembler,labelIndexer, classifier, labelConverter))
// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
//predictions.show
predictions.select( "SignIndicator","predictedLabel", "features").show(10)
+-------------+--------------+--------------------+ |SignIndicator|predictedLabel| features| +-------------+--------------+--------------------+ | -1.0| 1.0|(7,[1,2,3],[-2196...| | -1.0| 1.0|[0.0,-6837.843684...| | -1.0| 1.0|[0.0,-2705.464079...| | -1.0| 1.0|[0.37289733733823...| | 1.0| 1.0|[0.60599165505431...| | 1.0| 1.0|[0.61377963931595...| | 1.0| 1.0|[0.68071368083663...| | 1.0| 1.0|[0.73653534221789...| | -1.0| 1.0|[0.80460006816019...| | 1.0| 1.0|[0.84103575755062...| +-------------+--------------+--------------------+ only showing top 10 rows
null
Here is the calculation of the accuracy of our predictions (for the test data)
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
// Select (prediction, true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
0.6558407543216344
Finally we want to create a chart which displays the accuracy by the time window for the different stocks. The logic which has been described above is wrapped into some simple methods so that we can call them with the different parameters:
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
def getDataset(ticker:String, timeWindow:Int):Dataset[Row] = {
var timeSeriesOriginal = Context.getStockData(ticker).toTimeSeries()
var timeSeries = StockTimeSeriesEMA.create(timeSeriesOriginal, timeWindow)
// Relative Strength Index
var close = new ClosePriceIndicator(timeSeries)
var rsi = new RSIIndicator(close, timeWindow)
// Stochastic Oscillator
var sk = new StochasticOscillatorKIndicator(timeSeries, timeWindow )
var sd = new StochasticOscillatorDIndicator(sk)
// Williams %R
var williamsR = new WilliamsRIndicator(timeSeries,timeWindow)
// Moving Average Convergence Divergence
var macd = new MACDIndicator(close)
// Price Rate of Change
var roc = new ROCIndicator(close, timeWindow)
// On Balance Volume
var obv = new OnBalanceVolumeIndicator(timeSeries)
// Label
var offsetIndicator= new OffsetIndicator(close, +timeWindow)
var difference = new DifferenceIndicator(close, offsetIndicator)
var label = new SignIndicator(difference)
var in:List[org.ta4j.core.Indicator[org.ta4j.core.num.Num]] = List(rsi,sk,sd,williamsR,macd,roc,obv)
var out:List[org.ta4j.core.Indicator[org.ta4j.core.num.Num]] = List(label)
var iterator = new StockData2DIterator(in.asJava, out.asJava, 1, 1);
var os = new java.io.FileOutputStream("rf-stock.csv")
iterator.writeCSV(os)
os.close()
val data = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("rf-stock.csv")
return data
}
def getAccuracy(ticker:String, timeWindow:Int):Double = {
var data = getDataset(ticker, timeWindow)
// split data into training and test
val Array(trainingData, testData) = data.randomSplit(Array(0.8, 0.2), seed = 1234L)
// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select (prediction, true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
return accuracy
}
import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row getDataset: (ticker: String, timeWindow: Int)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] getAccuracy: (ticker: String, timeWindow: Int)Double
Now we can calculate the accuracy for the different periods and plot the result for AAPL, GE and SSNLF.
var range = (2 to 150)
var resultAAPL = range.map(periods => (periods, getAccuracy("AAPL", periods)))
var resultGE = range.map(periods => (periods, getAccuracy("GE", periods)))
var resultSSNLF = range.map(periods => (periods, getAccuracy("SSNLF", periods)))
new Plot().add(Seq(
new Line() {
x = resultAAPL.map(l => l._1)
y = resultAAPL.map(l => l._2)
displayName = "AAPL"
},
new Line() {
x = resultGE.map(l => l._1)
y = resultGE.map(l => l._2)
displayName = "GE"
},
new Line() {
x = resultSSNLF.map(l => l._1)
y = resultSSNLF.map(l => l._2)
displayName = "SSNLF"
}
))