#!/usr/bin/env python # coding: utf-8 #

Table of Contents

#
# # Labeling and MetaLabeling # ## Overview # # In this chapter of the book AFML, De Prado introduces several novel techniques for labeling returns for the purposes of supervised machine learning. # # First he identifies the typical issues of fixed-time horizon labeling methods - primarily that it is easy to mislabel a return due to dynamic nature of volatility throughout a trading period. # # More importantly he addresses a major overlooked aspect of the financial literature. He emphasizes that every investment strategy makes use of stop-loss limits of some kind, whether those are enforced by a margin call, risk department or self-imposed. He highlights how unrealistic it is to test/implement/propagate a strategy that profits from positions that would have been stopped out. # # > That virtually no publication accounts for that when labeling observations tells you something about the current state of financial literature. # > # > -De Prado, "Advances in Financial Machine Learning", pg.44 # # He also introduces a technique called metalabeling, which is used to augment a strategy by improving recall while also reducing the likelihood of overfitting. # In[1]: get_ipython().run_line_magic('load_ext', 'watermark') get_ipython().run_line_magic('watermark', '') get_ipython().run_line_magic('load_ext', 'autoreload') get_ipython().run_line_magic('autoreload', '2') # import standard libs from IPython.display import display from IPython.core.debugger import set_trace as bp from pathlib import PurePath, Path import sys import time from collections import OrderedDict as od import re import os import json # import python scientific stack import pandas as pd import pandas_datareader.data as web pd.set_option('display.max_rows', 100) from dask import dataframe as dd from dask.diagnostics import ProgressBar from multiprocessing import cpu_count pbar = ProgressBar() pbar.register() import numpy as np import scipy.stats as stats import statsmodels.api as sm from numba import jit import math import ffn # import visual tools import matplotlib as mpl import matplotlib.pyplot as plt import matplotlib.gridspec as gridspec get_ipython().run_line_magic('matplotlib', 'inline') import seaborn as sns plt.style.use('seaborn-talk') plt.style.use('bmh') #plt.rcParams['font.family'] = 'DejaVu Sans Mono' plt.rcParams['font.size'] = 9.5 plt.rcParams['font.weight'] = 'medium' plt.rcParams['figure.figsize'] = 10,7 blue, green, red, purple, gold, teal = sns.color_palette('colorblind', 6) # import util libs from tqdm import tqdm, tqdm_notebook import warnings warnings.filterwarnings("ignore") import missingno as msno from src.utils.utils import * import src.features.bars as brs import src.features.snippets as snp RANDOM_STATE = 777 pdir = get_relative_project_dir('Adv_Fin_ML_Exercises', partial=False) data_dir = pdir / 'data' print() get_ipython().run_line_magic('watermark', '-p pandas,pandas_datareader,dask,numpy,sklearn,statsmodels,scipy,ffn,matplotlib,seaborn') # ## Code Snippets # # Below I reproduce all the relevant code snippets found in the book that are necessary to work through the excercises found at the end of chapter 3. # ### Symmetric CUSUM Filter [2.5.2.1] # In[2]: def getTEvents(gRaw, h): tEvents, sPos, sNeg = [], 0, 0 diff = np.log(gRaw).diff().dropna() for i in tqdm(diff.index[1:]): try: pos, neg = float(sPos+diff.loc[i]), float(sNeg+diff.loc[i]) except Exception as e: print(e) print(sPos+diff.loc[i], type(sPos+diff.loc[i])) print(sNeg+diff.loc[i], type(sNeg+diff.loc[i])) break sPos, sNeg=max(0., pos), min(0., neg) if sNeg<-h: sNeg=0;tEvents.append(i) elif sPos>h: sPos=0;tEvents.append(i) return pd.DatetimeIndex(tEvents) # ### Daily Volatility Estimator [3.1] # In[3]: def getDailyVol(close,span0=100): # daily vol reindexed to close df0=close.index.searchsorted(close.index-pd.Timedelta(days=1)) df0=df0[df0>0] df0=(pd.Series(close.index[df0-1], index=close.index[close.shape[0]-df0.shape[0]:])) try: df0=close.loc[df0.index]/close.loc[df0.values].values-1 # daily rets except Exception as e: print(f'error: {e}\nplease confirm no duplicate indices') df0=df0.ewm(span=span0).std().rename('dailyVol') return df0 # ### Triple-Barrier Labeling Method [3.2] # In[4]: def applyPtSlOnT1(close,events,ptSl,molecule): # apply stop loss/profit taking, if it takes place before t1 (end of event) events_=events.loc[molecule] out=events_[['t1']].copy(deep=True) if ptSl[0]>0: pt=ptSl[0]*events_['trgt'] else: pt=pd.Series(index=events.index) # NaNs if ptSl[1]>0: sl=-ptSl[1]*events_['trgt'] else: sl=pd.Series(index=events.index) # NaNs for loc,t1 in events_['t1'].fillna(close.index[-1]).iteritems(): df0=close[loc:t1] # path prices df0=(df0/close[loc]-1)*events_.at[loc,'side'] # path returns out.loc[loc,'sl']=df0[df0pt[loc]].index.min() # earliest profit taking return out # ### Gettting Time of First Touch (getEvents) [3.3], [3.6] # In[5]: def getEvents(close, tEvents, ptSl, trgt, minRet, numThreads, t1=False, side=None): #1) get target trgt=trgt.loc[tEvents] trgt=trgt[trgt>minRet] # minRet #2) get t1 (max holding period) if t1 is False:t1=pd.Series(pd.NaT, index=tEvents) #3) form events object, apply stop loss on t1 if side is None:side_,ptSl_=pd.Series(1.,index=trgt.index), [ptSl[0],ptSl[0]] else: side_,ptSl_=side.loc[trgt.index],ptSl[:2] events=(pd.concat({'t1':t1,'trgt':trgt,'side':side_}, axis=1) .dropna(subset=['trgt'])) df0=mpPandasObj(func=applyPtSlOnT1,pdObj=('molecule',events.index), numThreads=numThreads,close=close,events=events, ptSl=ptSl_) events['t1']=df0.dropna(how='all').min(axis=1) # pd.min ignores nan if side is None:events=events.drop('side',axis=1) return events # ### Adding Vertical Barrier [3.4] # In[6]: def addVerticalBarrier(tEvents, close, numDays=1): t1=close.index.searchsorted(tEvents+pd.Timedelta(days=numDays)) t1=t1[t1minPct or df0.shape[0]<3:break print('dropped label: ', df0.argmin(),df0.min()) events=events[events['bin']!=df0.argmin()] return events # ### Linear Partitions [20.4.1] # In[10]: def linParts(numAtoms,numThreads): # partition of atoms with a single loop parts=np.linspace(0,numAtoms,min(numThreads,numAtoms)+1) parts=np.ceil(parts).astype(int) return parts # In[11]: def nestedParts(numAtoms,numThreads,upperTriang=False): # partition of atoms with an inner loop parts,numThreads_=[0],min(numThreads,numAtoms) for num in range(numThreads_): part=1+4*(parts[-1]**2+parts[-1]+numAtoms*(numAtoms+1.)/numThreads_) part=(-1+part**.5)/2. parts.append(part) parts=np.round(parts).astype(int) if upperTriang: # the first rows are heaviest parts=np.cumsum(np.diff(parts)[::-1]) parts=np.append(np.array([0]),parts) return parts # ### multiprocessing snippet [20.7] # In[12]: def mpPandasObj(func,pdObj,numThreads=24,mpBatches=1,linMols=True,**kargs): ''' Parallelize jobs, return a dataframe or series + func: function to be parallelized. Returns a DataFrame + pdObj[0]: Name of argument used to pass the molecule + pdObj[1]: List of atoms that will be grouped into molecules + kwds: any other argument needed by func Example: df1=mpPandasObj(func,('molecule',df0.index),24,**kwds) ''' import pandas as pd #if linMols:parts=linParts(len(argList[1]),numThreads*mpBatches) #else:parts=nestedParts(len(argList[1]),numThreads*mpBatches) if linMols:parts=linParts(len(pdObj[1]),numThreads*mpBatches) else:parts=nestedParts(len(pdObj[1]),numThreads*mpBatches) jobs=[] for i in range(1,len(parts)): job={pdObj[0]:pdObj[1][parts[i-1]:parts[i]],'func':func} job.update(kargs) jobs.append(job) if numThreads==1:out=processJobs_(jobs) else: out=processJobs(jobs,numThreads=numThreads) if isinstance(out[0],pd.DataFrame):df0=pd.DataFrame() elif isinstance(out[0],pd.Series):df0=pd.Series() else:return out for i in out:df0=df0.append(i) df0=df0.sort_index() return df0 # ### single-thread execution for debugging [20.8] # In[13]: def processJobs_(jobs): # Run jobs sequentially, for debugging out=[] for job in jobs: out_=expandCall(job) out.append(out_) return out # ### Example of async call to multiprocessing lib [20.9] # In[14]: import multiprocessing as mp import datetime as dt #________________________________ def reportProgress(jobNum,numJobs,time0,task): # Report progress as asynch jobs are completed msg=[float(jobNum)/numJobs, (time.time()-time0)/60.] msg.append(msg[1]*(1/msg[0]-1)) timeStamp=str(dt.datetime.fromtimestamp(time.time())) msg=timeStamp+' '+str(round(msg[0]*100,2))+'% '+task+' done after '+ \ str(round(msg[1],2))+' minutes. Remaining '+str(round(msg[2],2))+' minutes.' if jobNum df.slow return df.fast[(crit1) & (crit2)] def get_down_cross(df): crit1 = df.fast.shift(1) > df.slow.shift(1) crit2 = df.fast < df.slow return df.fast[(crit1) & (crit2)] up = get_up_cross(close_df) down = get_down_cross(close_df) f, ax = plt.subplots(figsize=(11,8)) close_df.loc['2014':].plot(ax=ax, alpha=.5) up.loc['2014':].plot(ax=ax,ls='',marker='^', markersize=7, alpha=0.75, label='upcross', color='g') down.loc['2014':].plot(ax=ax,ls='',marker='v', markersize=7, alpha=0.75, label='downcross', color='r') ax.legend() # ### (a) Derive meta-labels for `ptSl = [1,2]` and `t1` where `numdays=1`. Use as `trgt` dailyVol computed by snippet 3.1 (get events with sides) # In[30]: side_up = pd.Series(1, index=up.index) side_down = pd.Series(-1, index=down.index) side = pd.concat([side_up,side_down]).sort_index() cprint(side) # In[31]: minRet = .01 ptsl=[1,2] dailyVol = getDailyVol(close_df['price']) tEvents = getTEvents(close_df['price'],h=dailyVol.mean()) t1 = addVerticalBarrier(tEvents, close_df['price'], numDays=1) ma_events = getEvents(close_df['price'],tEvents,ptsl,target,minRet,cpus, t1=t1,side=side) cprint(ma_events) # In[32]: ma_events.side.value_counts() # In[33]: ma_side = ma_events.dropna().side # In[34]: ma_bins = getBinsNew(ma_events,close_df['price'], t1).dropna() cprint(ma_bins) # In[35]: Xx = pd.merge_asof(ma_bins, side.to_frame().rename(columns={0:'side'}), left_index=True, right_index=True, direction='forward') cprint(Xx) # ### (b) Train Random Forest to decide whether to trade or not `{0,1}` since underlying model (crossing m.a.) has decided the side, `{-1,1}` # In[36]: from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split from sklearn.metrics import roc_curve, classification_report # In[37]: X = ma_side.values.reshape(-1,1) #X = Xx.side.values.reshape(-1,1) y = ma_bins.bin.values X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, shuffle=False) n_estimator = 10000 rf = RandomForestClassifier(max_depth=2, n_estimators=n_estimator, criterion='entropy', random_state=RANDOM_STATE) rf.fit(X_train, y_train) # The random forest model by itself y_pred_rf = rf.predict_proba(X_test)[:, 1] y_pred = rf.predict(X_test) fpr_rf, tpr_rf, _ = roc_curve(y_test, y_pred_rf) print(classification_report(y_test, y_pred)) plt.figure(1) plt.plot([0, 1], [0, 1], 'k--') plt.plot(fpr_rf, tpr_rf, label='RF') plt.xlabel('False positive rate') plt.ylabel('True positive rate') plt.title('ROC curve') plt.legend(loc='best') plt.show() # ## [3.5] Develop mean-reverting Bollinger Band Strategy. For each obs. model suggests a side but not size of the bet. # In[38]: def bbands(price, window=None, width=None, numsd=None): """ returns average, upper band, and lower band""" ave = price.rolling(window).mean() sd = price.rolling(window).std(ddof=0) if width: upband = ave * (1+width) dnband = ave * (1-width) return price, np.round(ave,3), np.round(upband,3), np.round(dnband,3) if numsd: upband = ave + (sd*numsd) dnband = ave - (sd*numsd) return price, np.round(ave,3), np.round(upband,3), np.round(dnband,3) # In[39]: window=50 bb_df = pd.DataFrame() bb_df['price'],bb_df['ave'],bb_df['upper'],bb_df['lower']=bbands(close, window=window, numsd=1) bb_df.dropna(inplace=True) cprint(bb_df) # In[40]: f,ax=plt.subplots(figsize=(11,8)) bb_df.loc['2014'].plot(ax=ax) # In[41]: def get_up_cross(df, col): # col is price column crit1 = df[col].shift(1) < df.upper.shift(1) crit2 = df[col] > df.upper return df[col][(crit1) & (crit2)] def get_down_cross(df, col): # col is price column crit1 = df[col].shift(1) > df.lower.shift(1) crit2 = df[col] < df.lower return df[col][(crit1) & (crit2)] bb_down = get_down_cross(bb_df, 'price') bb_up = get_up_cross(bb_df, 'price') f, ax = plt.subplots(figsize=(11,8)) bb_df.loc['2014':].plot(ax=ax, alpha=.5) bb_up.loc['2014':].plot(ax=ax, ls='', marker='^', markersize=7, alpha=0.75, label='upcross', color='g') bb_down.loc['2014':].plot(ax=ax, ls='', marker='v', markersize=7, alpha=0.75, label='downcross', color='r') ax.legend() # ### (a) Derive meta-labels for `ptSl=[0,2]` and `t1` where `numdays=1`. Use as `trgt` dailyVol. # In[42]: bb_side_up = pd.Series(-1, index=bb_up.index) # sell on up cross for mean reversion bb_side_down = pd.Series(1, index=bb_down.index) # buy on down cross for mean reversion bb_side_raw = pd.concat([bb_side_up,bb_side_down]).sort_index() cprint(bb_side_raw) minRet = .01 ptsl=[0,2] bb_events = getEvents(close,tEvents,ptsl,target,minRet,cpus,t1=t1,side=bb_side_raw) cprint(bb_events) bb_side = bb_events.dropna().side cprint(bb_side) # In[43]: bb_side.value_counts() # In[44]: bb_bins = getBins(bb_events,close).dropna() cprint(bb_bins) print(bb_bins.bin.value_counts()) # ### (b) train random forest to decide to trade or not. Use features: volatility, serial correlation, and the crossing moving averages from exercise 2. # In[45]: def returns(s): arr = np.diff(np.log(s)) return (pd.Series(arr, index=s.index[1:])) def df_rolling_autocorr(df, window, lag=1): """Compute rolling column-wise autocorrelation for a DataFrame.""" return (df.rolling(window=window) .corr(df.shift(lag))) # could .dropna() here #df_rolling_autocorr(d1, window=21).dropna().head() # In[46]: srl_corr = df_rolling_autocorr(returns(close), window=window).rename('srl_corr') cprint(srl_corr) # In[47]: features = (pd.DataFrame() .assign(vol=bb_events.trgt) .assign(ma_side=ma_side) .assign(srl_corr=srl_corr) .drop_duplicates() .dropna()) cprint(features) # In[48]: Xy = (pd.merge_asof(features, bb_bins[['bin']], left_index=True, right_index=True, direction='forward').dropna()) cprint(Xy) # In[49]: Xy.bin.value_counts() # In[50]: X = Xy.drop('bin',axis=1).values y = Xy['bin'].values X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, shuffle=False) n_estimator = 10000 rf = RandomForestClassifier(max_depth=2, n_estimators=n_estimator, criterion='entropy', random_state=RANDOM_STATE) rf.fit(X_train, y_train) # The random forest model by itself y_pred_rf = rf.predict_proba(X_test)[:, 1] y_pred = rf.predict(X_test) fpr_rf, tpr_rf, _ = roc_curve(y_test, y_pred_rf) print(classification_report(y_test, y_pred, target_names=['no_trade','trade'])) plt.figure(1) plt.plot([0, 1], [0, 1], 'k--') plt.plot(fpr_rf, tpr_rf, label='RF') plt.xlabel('False positive rate') plt.ylabel('True positive rate') plt.title('ROC curve') plt.legend(loc='best') plt.show() # ### (c) What is accuracy of predictions from primary model if the secondary model does not filter bets? What is classification report? # In[51]: minRet = .01 ptsl=[0,2] bb_events = getEvents(close,tEvents,ptsl,target,minRet,cpus,t1=t1) cprint(bb_events) bb_bins = getBins(bb_events,close).dropna() cprint(bb_bins) features = (pd.DataFrame() .assign(vol=bb_events.trgt) .assign(ma_side=ma_side) .assign(srl_corr=srl_corr) .drop_duplicates() .dropna()) cprint(features) Xy = (pd.merge_asof(features, bb_bins[['bin']], left_index=True, right_index=True, direction='forward').dropna()) cprint(Xy) ### run model ### X = Xy.drop('bin',axis=1).values y = Xy['bin'].values X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, shuffle=False) n_estimator = 10000 rf = RandomForestClassifier(max_depth=2, n_estimators=n_estimator, criterion='entropy', random_state=RANDOM_STATE) rf.fit(X_train, y_train) # The random forest model by itself y_pred_rf = rf.predict_proba(X_test)[:, 1] y_pred = rf.predict(X_test) fpr_rf, tpr_rf, _ = roc_curve(y_test, y_pred_rf) print(classification_report(y_test, y_pred, target_names=['no_trade','trade'])) plt.figure(1) plt.plot([0, 1], [0, 1], 'k--') plt.plot(fpr_rf, tpr_rf, label='RF') plt.xlabel('False positive rate') plt.ylabel('True positive rate') plt.title('ROC curve') plt.legend(loc='best') plt.show() # In[ ]: