#!/usr/bin/env python
# coding: utf-8
#
Table of Contents
#
# # Labeling and MetaLabeling
# ## Overview
#
# In this chapter of the book AFML, De Prado introduces several novel techniques for labeling returns for the purposes of supervised machine learning.
#
# First he identifies the typical issues of fixed-time horizon labeling methods - primarily that it is easy to mislabel a return due to dynamic nature of volatility throughout a trading period.
#
# More importantly he addresses a major overlooked aspect of the financial literature. He emphasizes that every investment strategy makes use of stop-loss limits of some kind, whether those are enforced by a margin call, risk department or self-imposed. He highlights how unrealistic it is to test/implement/propagate a strategy that profits from positions that would have been stopped out.
#
# > That virtually no publication accounts for that when labeling observations tells you something about the current state of financial literature.
# >
# > -De Prado, "Advances in Financial Machine Learning", pg.44
#
# He also introduces a technique called metalabeling, which is used to augment a strategy by improving recall while also reducing the likelihood of overfitting.
# In[1]:
get_ipython().run_line_magic('load_ext', 'watermark')
get_ipython().run_line_magic('watermark', '')
get_ipython().run_line_magic('load_ext', 'autoreload')
get_ipython().run_line_magic('autoreload', '2')
# import standard libs
from IPython.display import display
from IPython.core.debugger import set_trace as bp
from pathlib import PurePath, Path
import sys
import time
from collections import OrderedDict as od
import re
import os
import json
# import python scientific stack
import pandas as pd
import pandas_datareader.data as web
pd.set_option('display.max_rows', 100)
from dask import dataframe as dd
from dask.diagnostics import ProgressBar
from multiprocessing import cpu_count
pbar = ProgressBar()
pbar.register()
import numpy as np
import scipy.stats as stats
import statsmodels.api as sm
from numba import jit
import math
import ffn
# import visual tools
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
get_ipython().run_line_magic('matplotlib', 'inline')
import seaborn as sns
plt.style.use('seaborn-talk')
plt.style.use('bmh')
#plt.rcParams['font.family'] = 'DejaVu Sans Mono'
plt.rcParams['font.size'] = 9.5
plt.rcParams['font.weight'] = 'medium'
plt.rcParams['figure.figsize'] = 10,7
blue, green, red, purple, gold, teal = sns.color_palette('colorblind', 6)
# import util libs
from tqdm import tqdm, tqdm_notebook
import warnings
warnings.filterwarnings("ignore")
import missingno as msno
from src.utils.utils import *
import src.features.bars as brs
import src.features.snippets as snp
RANDOM_STATE = 777
pdir = get_relative_project_dir('Adv_Fin_ML_Exercises', partial=False)
data_dir = pdir / 'data'
print()
get_ipython().run_line_magic('watermark', '-p pandas,pandas_datareader,dask,numpy,sklearn,statsmodels,scipy,ffn,matplotlib,seaborn')
# ## Code Snippets
#
# Below I reproduce all the relevant code snippets found in the book that are necessary to work through the excercises found at the end of chapter 3.
# ### Symmetric CUSUM Filter [2.5.2.1]
# In[2]:
def getTEvents(gRaw, h):
tEvents, sPos, sNeg = [], 0, 0
diff = np.log(gRaw).diff().dropna()
for i in tqdm(diff.index[1:]):
try:
pos, neg = float(sPos+diff.loc[i]), float(sNeg+diff.loc[i])
except Exception as e:
print(e)
print(sPos+diff.loc[i], type(sPos+diff.loc[i]))
print(sNeg+diff.loc[i], type(sNeg+diff.loc[i]))
break
sPos, sNeg=max(0., pos), min(0., neg)
if sNeg<-h:
sNeg=0;tEvents.append(i)
elif sPos>h:
sPos=0;tEvents.append(i)
return pd.DatetimeIndex(tEvents)
# ### Daily Volatility Estimator [3.1]
# In[3]:
def getDailyVol(close,span0=100):
# daily vol reindexed to close
df0=close.index.searchsorted(close.index-pd.Timedelta(days=1))
df0=df0[df0>0]
df0=(pd.Series(close.index[df0-1],
index=close.index[close.shape[0]-df0.shape[0]:]))
try:
df0=close.loc[df0.index]/close.loc[df0.values].values-1 # daily rets
except Exception as e:
print(f'error: {e}\nplease confirm no duplicate indices')
df0=df0.ewm(span=span0).std().rename('dailyVol')
return df0
# ### Triple-Barrier Labeling Method [3.2]
# In[4]:
def applyPtSlOnT1(close,events,ptSl,molecule):
# apply stop loss/profit taking, if it takes place before t1 (end of event)
events_=events.loc[molecule]
out=events_[['t1']].copy(deep=True)
if ptSl[0]>0: pt=ptSl[0]*events_['trgt']
else: pt=pd.Series(index=events.index) # NaNs
if ptSl[1]>0: sl=-ptSl[1]*events_['trgt']
else: sl=pd.Series(index=events.index) # NaNs
for loc,t1 in events_['t1'].fillna(close.index[-1]).iteritems():
df0=close[loc:t1] # path prices
df0=(df0/close[loc]-1)*events_.at[loc,'side'] # path returns
out.loc[loc,'sl']=df0[df0pt[loc]].index.min() # earliest profit taking
return out
# ### Gettting Time of First Touch (getEvents) [3.3], [3.6]
# In[5]:
def getEvents(close, tEvents, ptSl, trgt, minRet, numThreads, t1=False, side=None):
#1) get target
trgt=trgt.loc[tEvents]
trgt=trgt[trgt>minRet] # minRet
#2) get t1 (max holding period)
if t1 is False:t1=pd.Series(pd.NaT, index=tEvents)
#3) form events object, apply stop loss on t1
if side is None:side_,ptSl_=pd.Series(1.,index=trgt.index), [ptSl[0],ptSl[0]]
else: side_,ptSl_=side.loc[trgt.index],ptSl[:2]
events=(pd.concat({'t1':t1,'trgt':trgt,'side':side_}, axis=1)
.dropna(subset=['trgt']))
df0=mpPandasObj(func=applyPtSlOnT1,pdObj=('molecule',events.index),
numThreads=numThreads,close=close,events=events,
ptSl=ptSl_)
events['t1']=df0.dropna(how='all').min(axis=1) # pd.min ignores nan
if side is None:events=events.drop('side',axis=1)
return events
# ### Adding Vertical Barrier [3.4]
# In[6]:
def addVerticalBarrier(tEvents, close, numDays=1):
t1=close.index.searchsorted(tEvents+pd.Timedelta(days=numDays))
t1=t1[t1minPct or df0.shape[0]<3:break
print('dropped label: ', df0.argmin(),df0.min())
events=events[events['bin']!=df0.argmin()]
return events
# ### Linear Partitions [20.4.1]
# In[10]:
def linParts(numAtoms,numThreads):
# partition of atoms with a single loop
parts=np.linspace(0,numAtoms,min(numThreads,numAtoms)+1)
parts=np.ceil(parts).astype(int)
return parts
# In[11]:
def nestedParts(numAtoms,numThreads,upperTriang=False):
# partition of atoms with an inner loop
parts,numThreads_=[0],min(numThreads,numAtoms)
for num in range(numThreads_):
part=1+4*(parts[-1]**2+parts[-1]+numAtoms*(numAtoms+1.)/numThreads_)
part=(-1+part**.5)/2.
parts.append(part)
parts=np.round(parts).astype(int)
if upperTriang: # the first rows are heaviest
parts=np.cumsum(np.diff(parts)[::-1])
parts=np.append(np.array([0]),parts)
return parts
# ### multiprocessing snippet [20.7]
# In[12]:
def mpPandasObj(func,pdObj,numThreads=24,mpBatches=1,linMols=True,**kargs):
'''
Parallelize jobs, return a dataframe or series
+ func: function to be parallelized. Returns a DataFrame
+ pdObj[0]: Name of argument used to pass the molecule
+ pdObj[1]: List of atoms that will be grouped into molecules
+ kwds: any other argument needed by func
Example: df1=mpPandasObj(func,('molecule',df0.index),24,**kwds)
'''
import pandas as pd
#if linMols:parts=linParts(len(argList[1]),numThreads*mpBatches)
#else:parts=nestedParts(len(argList[1]),numThreads*mpBatches)
if linMols:parts=linParts(len(pdObj[1]),numThreads*mpBatches)
else:parts=nestedParts(len(pdObj[1]),numThreads*mpBatches)
jobs=[]
for i in range(1,len(parts)):
job={pdObj[0]:pdObj[1][parts[i-1]:parts[i]],'func':func}
job.update(kargs)
jobs.append(job)
if numThreads==1:out=processJobs_(jobs)
else: out=processJobs(jobs,numThreads=numThreads)
if isinstance(out[0],pd.DataFrame):df0=pd.DataFrame()
elif isinstance(out[0],pd.Series):df0=pd.Series()
else:return out
for i in out:df0=df0.append(i)
df0=df0.sort_index()
return df0
# ### single-thread execution for debugging [20.8]
# In[13]:
def processJobs_(jobs):
# Run jobs sequentially, for debugging
out=[]
for job in jobs:
out_=expandCall(job)
out.append(out_)
return out
# ### Example of async call to multiprocessing lib [20.9]
# In[14]:
import multiprocessing as mp
import datetime as dt
#________________________________
def reportProgress(jobNum,numJobs,time0,task):
# Report progress as asynch jobs are completed
msg=[float(jobNum)/numJobs, (time.time()-time0)/60.]
msg.append(msg[1]*(1/msg[0]-1))
timeStamp=str(dt.datetime.fromtimestamp(time.time()))
msg=timeStamp+' '+str(round(msg[0]*100,2))+'% '+task+' done after '+ \
str(round(msg[1],2))+' minutes. Remaining '+str(round(msg[2],2))+' minutes.'
if jobNum df.slow
return df.fast[(crit1) & (crit2)]
def get_down_cross(df):
crit1 = df.fast.shift(1) > df.slow.shift(1)
crit2 = df.fast < df.slow
return df.fast[(crit1) & (crit2)]
up = get_up_cross(close_df)
down = get_down_cross(close_df)
f, ax = plt.subplots(figsize=(11,8))
close_df.loc['2014':].plot(ax=ax, alpha=.5)
up.loc['2014':].plot(ax=ax,ls='',marker='^', markersize=7,
alpha=0.75, label='upcross', color='g')
down.loc['2014':].plot(ax=ax,ls='',marker='v', markersize=7,
alpha=0.75, label='downcross', color='r')
ax.legend()
# ### (a) Derive meta-labels for `ptSl = [1,2]` and `t1` where `numdays=1`. Use as `trgt` dailyVol computed by snippet 3.1 (get events with sides)
# In[30]:
side_up = pd.Series(1, index=up.index)
side_down = pd.Series(-1, index=down.index)
side = pd.concat([side_up,side_down]).sort_index()
cprint(side)
# In[31]:
minRet = .01
ptsl=[1,2]
dailyVol = getDailyVol(close_df['price'])
tEvents = getTEvents(close_df['price'],h=dailyVol.mean())
t1 = addVerticalBarrier(tEvents, close_df['price'], numDays=1)
ma_events = getEvents(close_df['price'],tEvents,ptsl,target,minRet,cpus,
t1=t1,side=side)
cprint(ma_events)
# In[32]:
ma_events.side.value_counts()
# In[33]:
ma_side = ma_events.dropna().side
# In[34]:
ma_bins = getBinsNew(ma_events,close_df['price'], t1).dropna()
cprint(ma_bins)
# In[35]:
Xx = pd.merge_asof(ma_bins, side.to_frame().rename(columns={0:'side'}),
left_index=True, right_index=True, direction='forward')
cprint(Xx)
# ### (b) Train Random Forest to decide whether to trade or not `{0,1}` since underlying model (crossing m.a.) has decided the side, `{-1,1}`
# In[36]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, classification_report
# In[37]:
X = ma_side.values.reshape(-1,1)
#X = Xx.side.values.reshape(-1,1)
y = ma_bins.bin.values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5,
shuffle=False)
n_estimator = 10000
rf = RandomForestClassifier(max_depth=2, n_estimators=n_estimator,
criterion='entropy', random_state=RANDOM_STATE)
rf.fit(X_train, y_train)
# The random forest model by itself
y_pred_rf = rf.predict_proba(X_test)[:, 1]
y_pred = rf.predict(X_test)
fpr_rf, tpr_rf, _ = roc_curve(y_test, y_pred_rf)
print(classification_report(y_test, y_pred))
plt.figure(1)
plt.plot([0, 1], [0, 1], 'k--')
plt.plot(fpr_rf, tpr_rf, label='RF')
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.legend(loc='best')
plt.show()
# ## [3.5] Develop mean-reverting Bollinger Band Strategy. For each obs. model suggests a side but not size of the bet.
# In[38]:
def bbands(price, window=None, width=None, numsd=None):
""" returns average, upper band, and lower band"""
ave = price.rolling(window).mean()
sd = price.rolling(window).std(ddof=0)
if width:
upband = ave * (1+width)
dnband = ave * (1-width)
return price, np.round(ave,3), np.round(upband,3), np.round(dnband,3)
if numsd:
upband = ave + (sd*numsd)
dnband = ave - (sd*numsd)
return price, np.round(ave,3), np.round(upband,3), np.round(dnband,3)
# In[39]:
window=50
bb_df = pd.DataFrame()
bb_df['price'],bb_df['ave'],bb_df['upper'],bb_df['lower']=bbands(close, window=window, numsd=1)
bb_df.dropna(inplace=True)
cprint(bb_df)
# In[40]:
f,ax=plt.subplots(figsize=(11,8))
bb_df.loc['2014'].plot(ax=ax)
# In[41]:
def get_up_cross(df, col):
# col is price column
crit1 = df[col].shift(1) < df.upper.shift(1)
crit2 = df[col] > df.upper
return df[col][(crit1) & (crit2)]
def get_down_cross(df, col):
# col is price column
crit1 = df[col].shift(1) > df.lower.shift(1)
crit2 = df[col] < df.lower
return df[col][(crit1) & (crit2)]
bb_down = get_down_cross(bb_df, 'price')
bb_up = get_up_cross(bb_df, 'price')
f, ax = plt.subplots(figsize=(11,8))
bb_df.loc['2014':].plot(ax=ax, alpha=.5)
bb_up.loc['2014':].plot(ax=ax, ls='', marker='^', markersize=7,
alpha=0.75, label='upcross', color='g')
bb_down.loc['2014':].plot(ax=ax, ls='', marker='v', markersize=7,
alpha=0.75, label='downcross', color='r')
ax.legend()
# ### (a) Derive meta-labels for `ptSl=[0,2]` and `t1` where `numdays=1`. Use as `trgt` dailyVol.
# In[42]:
bb_side_up = pd.Series(-1, index=bb_up.index) # sell on up cross for mean reversion
bb_side_down = pd.Series(1, index=bb_down.index) # buy on down cross for mean reversion
bb_side_raw = pd.concat([bb_side_up,bb_side_down]).sort_index()
cprint(bb_side_raw)
minRet = .01
ptsl=[0,2]
bb_events = getEvents(close,tEvents,ptsl,target,minRet,cpus,t1=t1,side=bb_side_raw)
cprint(bb_events)
bb_side = bb_events.dropna().side
cprint(bb_side)
# In[43]:
bb_side.value_counts()
# In[44]:
bb_bins = getBins(bb_events,close).dropna()
cprint(bb_bins)
print(bb_bins.bin.value_counts())
# ### (b) train random forest to decide to trade or not. Use features: volatility, serial correlation, and the crossing moving averages from exercise 2.
# In[45]:
def returns(s):
arr = np.diff(np.log(s))
return (pd.Series(arr, index=s.index[1:]))
def df_rolling_autocorr(df, window, lag=1):
"""Compute rolling column-wise autocorrelation for a DataFrame."""
return (df.rolling(window=window)
.corr(df.shift(lag))) # could .dropna() here
#df_rolling_autocorr(d1, window=21).dropna().head()
# In[46]:
srl_corr = df_rolling_autocorr(returns(close), window=window).rename('srl_corr')
cprint(srl_corr)
# In[47]:
features = (pd.DataFrame()
.assign(vol=bb_events.trgt)
.assign(ma_side=ma_side)
.assign(srl_corr=srl_corr)
.drop_duplicates()
.dropna())
cprint(features)
# In[48]:
Xy = (pd.merge_asof(features, bb_bins[['bin']],
left_index=True, right_index=True,
direction='forward').dropna())
cprint(Xy)
# In[49]:
Xy.bin.value_counts()
# In[50]:
X = Xy.drop('bin',axis=1).values
y = Xy['bin'].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5,
shuffle=False)
n_estimator = 10000
rf = RandomForestClassifier(max_depth=2, n_estimators=n_estimator,
criterion='entropy', random_state=RANDOM_STATE)
rf.fit(X_train, y_train)
# The random forest model by itself
y_pred_rf = rf.predict_proba(X_test)[:, 1]
y_pred = rf.predict(X_test)
fpr_rf, tpr_rf, _ = roc_curve(y_test, y_pred_rf)
print(classification_report(y_test, y_pred, target_names=['no_trade','trade']))
plt.figure(1)
plt.plot([0, 1], [0, 1], 'k--')
plt.plot(fpr_rf, tpr_rf, label='RF')
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.legend(loc='best')
plt.show()
# ### (c) What is accuracy of predictions from primary model if the secondary model does not filter bets? What is classification report?
# In[51]:
minRet = .01
ptsl=[0,2]
bb_events = getEvents(close,tEvents,ptsl,target,minRet,cpus,t1=t1)
cprint(bb_events)
bb_bins = getBins(bb_events,close).dropna()
cprint(bb_bins)
features = (pd.DataFrame()
.assign(vol=bb_events.trgt)
.assign(ma_side=ma_side)
.assign(srl_corr=srl_corr)
.drop_duplicates()
.dropna())
cprint(features)
Xy = (pd.merge_asof(features, bb_bins[['bin']],
left_index=True, right_index=True,
direction='forward').dropna())
cprint(Xy)
### run model ###
X = Xy.drop('bin',axis=1).values
y = Xy['bin'].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5,
shuffle=False)
n_estimator = 10000
rf = RandomForestClassifier(max_depth=2, n_estimators=n_estimator,
criterion='entropy', random_state=RANDOM_STATE)
rf.fit(X_train, y_train)
# The random forest model by itself
y_pred_rf = rf.predict_proba(X_test)[:, 1]
y_pred = rf.predict(X_test)
fpr_rf, tpr_rf, _ = roc_curve(y_test, y_pred_rf)
print(classification_report(y_test, y_pred,
target_names=['no_trade','trade']))
plt.figure(1)
plt.plot([0, 1], [0, 1], 'k--')
plt.plot(fpr_rf, tpr_rf, label='RF')
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.legend(loc='best')
plt.show()
# In[ ]: