#!/usr/bin/env python # coding: utf-8 #

Table of Contents

#
# Advances in Machine Learning # # Chapter 1 - Exploring Tick, Volume, DV Bars # In[1]: get_ipython().run_line_magic('load_ext', 'watermark') get_ipython().run_line_magic('watermark', '') get_ipython().run_line_magic('load_ext', 'autoreload') get_ipython().run_line_magic('autoreload', '2') # import standard libs from IPython.display import display from IPython.core.debugger import set_trace as bp from pathlib import PurePath, Path import sys import time from collections import OrderedDict as od import re import os import json os.environ['THEANO_FLAGS'] = 'device=cpu,floatX=float32' # import python scientific stack import pandas as pd import pandas_datareader.data as web pd.set_option('display.max_rows', 100) from dask import dataframe as dd from dask.diagnostics import ProgressBar pbar = ProgressBar() pbar.register() import numpy as np import scipy.stats as stats import statsmodels.api as sm from numba import jit import math import pymc3 as pm from theano import shared, theano as tt # import visual tools import matplotlib as mpl import matplotlib.pyplot as plt import matplotlib.gridspec as gridspec get_ipython().run_line_magic('matplotlib', 'inline') import seaborn as sns plt.style.use('seaborn-talk') plt.style.use('bmh') #plt.rcParams['font.family'] = 'DejaVu Sans Mono' #plt.rcParams['font.size'] = 9.5 plt.rcParams['font.weight'] = 'medium' #plt.rcParams['figure.figsize'] = 10,7 blue, green, red, purple, gold, teal = sns.color_palette('colorblind', 6) # import util libs import pyarrow as pa import pyarrow.parquet as pq from tqdm import tqdm, tqdm_notebook import warnings warnings.filterwarnings("ignore") import missingno as msno from src.utils.utils import * from src.features.bars import get_imbalance import src.features.bars as brs import src.features.snippets as snp RANDOM_STATE = 777 print() get_ipython().run_line_magic('watermark', '-p pandas,pandas_datareader,dask,numpy,pymc3,theano,sklearn,statsmodels,scipy,matplotlib,seaborn,pyarrow,fastparquet') # ## Introduction # # This notebook explores the idea of sampling prices as a function of something other than fixed time intervals. For example using the number of ticks, volume or dollar volume traded as the sampling interval. The rest of this notebook works through some of the exercises found in chapters 1 and 2 of the book. # # This notebook makes use of the following script found here: `./src/features/bars.py` # ## Read and Clean Data # The data set used in this example is too large to be hosted on github. It is a sample of equity tick data, symbol `IVE`, provided by [kibot.com (caution: download link)](http://api.kibot.com/?action=history&symbol=IVE&interval=tickbidask&bp=1&user=guest). Download this data to the `./data/raw/` directory in your local repo. # In[2]: def read_kibot_ticks(fp): # read tick data from http://www.kibot.com/support.aspx#data_format cols = list(map(str.lower,['Date','Time','Price','Bid','Ask','Size'])) df = (pd.read_csv(fp, header=None) .rename(columns=dict(zip(range(len(cols)),cols))) .assign(dates=lambda df: (pd.to_datetime(df['date']+df['time'], format='%m/%d/%Y%H:%M:%S'))) .assign(v=lambda df: df['size']) # volume .assign(dv=lambda df: df['price']*df['size']) # dollar volume .drop(['date','time'],axis=1) .set_index('dates') .drop_duplicates()) return df infp = PurePath(data_dir/'raw'/'IVE_tickbidask.txt') df = read_kibot_ticks(infp) cprint(df) # Save initial processed data as parquet in the `./data/interim/` folder and reload. # In[3]: outfp = PurePath(data_dir/'interim'/'IVE_tickbidask.parq') df.to_parquet(outfp) # In[4]: infp=PurePath(data_dir/'interim'/'IVE_tickbidask.parq') df = pd.read_parquet(infp) cprint(df) # In[5]: msno.matrix(df) # ## Remove Obvious Price Errors in Tick Data # In[6]: sns.boxplot(df.price) # In[7]: @jit(nopython=True) def mad_outlier(y, thresh=3.): ''' compute outliers based on mad # args y: assumed to be array with shape (N,1) thresh: float() # returns array index of outliers ''' median = np.median(y) diff = np.sum((y - median)**2, axis=-1) diff = np.sqrt(diff) med_abs_deviation = np.median(diff) modified_z_score = 0.6745 * diff / med_abs_deviation return modified_z_score > thresh # In[8]: mad = mad_outlier(df.price.values.reshape(-1,1)) # In[9]: df.loc[mad] # In[10]: sns.boxplot(df.loc[~mad].price) # Drop outliers from dataset and save cleaned data in the `./data/processed/` folder. # In[11]: df = df.loc[~mad] cprint(df) outfp = PurePath(data_dir/'processed'/'clean_IVE_fut_prices.parq') df.to_parquet(outfp) # In[12]: infp=PurePath(data_dir/'processed'/'clean_IVE_fut_prices.parq') df = pd.read_parquet(infp) cprint(df) # # Tick Bars # In[13]: def tick_bars(df, price_column, m): ''' compute tick bars # args df: pd.DataFrame() column: name for price data m: int(), threshold value for ticks # returns idx: list of indices ''' t = df[price_column] ts = 0 idx = [] for i, x in enumerate(tqdm(t)): ts += 1 if ts >= m: idx.append(i) ts = 0 continue return idx def tick_bar_df(df, price_column, m): idx = tick_bars(df, price_column, m) return df.iloc[idx].drop_duplicates() # There are many ways to choose `M`, or the threshold value for sampling prices. One way is based on ratios of total dollar value/volume traded vs number of ticks. The rest of the notebook uses an arbitrary but sensible `M` value. I leave it as an exercise for the reader to see how the results change based on different values of `M`. # In[14]: n_ticks = df.shape[0] volume_ratio = (df.v.sum()/n_ticks).round() dollar_ratio = (df.dv.sum()/n_ticks).round() print(f'num ticks: {n_ticks:,}') print(f'volume ratio: {volume_ratio}') print(f'dollar ratio: {dollar_ratio}') # In[15]: tick_M = 100 # arbitrary print(f'tick threshold: {tick_M:,}') tidx = tick_bars(df, 'price', tick_M) tidx[:10] # In[16]: df.iloc[tidx].shape, df.shape # Dataset is large so select smaller example for quick exploration # In[17]: tick_df = tick_bar_df(df, 'price', tick_M) tick_df.shape # In[18]: def select_sample_data(ref, sub, price_col, date): ''' select a sample of data based on date, assumes datetimeindex # args ref: pd.DataFrame containing all ticks sub: subordinated pd.DataFrame of prices price_col: str(), price column date: str(), date to select # returns xdf: ref pd.Series xtdf: subordinated pd.Series ''' xdf = ref[price_col].loc[date] xtdf = sub[price_col].loc[date] return xdf, xtdf ## try different dates to see how the quantity of tick bars changes xDate ='2009-10-01' #'2017-10-4' xdf, xtdf = select_sample_data(df, tick_df, 'price', xDate) xdf.shape, xtdf.shape # In[19]: def plot_sample_data(ref, sub, bar_type, *args, **kwds): f,axes=plt.subplots(3,sharex=True, sharey=True, figsize=(10,7)) ref.plot(*args, **kwds, ax=axes[0], label='price') sub.plot(*args, **kwds, ax=axes[0], marker='X', ls='', label=bar_type) axes[0].legend(); ref.plot(*args, **kwds, ax=axes[1], label='price', marker='o') sub.plot(*args, **kwds, ax=axes[2], ls='', marker='X', color='r', label=bar_type) for ax in axes[1:]: ax.legend() plt.tight_layout() return plot_sample_data(xdf, xtdf, 'tick bar', alpha=0.5, markersize=7) # ### Bonus Exercise: Make OHLC Bars from Custom Bars # Extract `tick_df.price` and `df.price` into two pandas series. # In[20]: sub = tick_df.price ref = df.price # The function below creates the OHLC dataframe by: # 1. Iterating over the subordinated series' index extracting idx and idx+1 period # 2. Selecting the same date period from the reference series # 3. Extracting the max, min prices from the reference series. # 4. Combining the o,h,l,c and start and end timestamps into a row # 5. Returning the aggregated rows as a pandas dataframe. # In[21]: def get_ohlc(ref, sub): ''' fn: get ohlc from custom bars # args ref : reference pandas series with all prices sub : custom tick pandas series # returns tick_df : dataframe with ohlc values ''' ohlc = [] for i in tqdm(range(sub.index.shape[0]-1)): start,end = sub.index[i], sub.index[i+1] tmp_ref = ref.loc[start:end] max_px, min_px = tmp_ref.max(), tmp_ref.min() o,h,l,c = sub.iloc[i], max_px, min_px, sub.iloc[i+1] ohlc.append((end,start,o,h,l,c)) cols = ['end','start','open','high','low','close'] return (pd.DataFrame(ohlc,columns=cols)) ## uncomment below to run (takes about 5-6 mins on my machine) #tick_bars_ohlc = get_ohlc(ref, sub) #cprint(tick_bars_ohlc) #outfp = PurePath(data_dir/'processed'/'tick_bars_ohlc.parq') #tick_bars_ohlc.to_parquet(outfp) # # Volume Bars # In[22]: def volume_bars(df, volume_column, m): ''' compute volume bars # args df: pd.DataFrame() volume_column: name for volume data m: int(), threshold value for volume # returns idx: list of indices ''' t = df[volume_column] ts = 0 idx = [] for i, x in enumerate(tqdm(t)): ts += x if ts >= m: idx.append(i) ts = 0 continue return idx def volume_bar_df(df, volume_column, m): idx = volume_bars(df, volume_column, m) return df.iloc[idx].drop_duplicates() # In[23]: volume_M = 10_000 # arbitrary print(f'volume threshold: {volume_M:,}') v_bar_df = volume_bar_df(df, 'v', 'price', volume_M) cprint(v_bar_df) # In[24]: xDate = '2009-10-1' xdf, xtdf = select_sample_data(df, v_bar_df, 'price', xDate) print(f'xdf shape: {xdf.shape}, xtdf shape: {xtdf.shape}') plot_sample_data(xdf, xtdf, 'volume bar', alpha=0.5, markersize=7) # # Dollar Value Bars # In[25]: def dollar_bars(df, dv_column, m): ''' compute dollar bars # args df: pd.DataFrame() dv_column: name for dollar volume data m: int(), threshold value for dollars # returns idx: list of indices ''' t = df[column] ts = 0 idx = [] for i, x in enumerate(tqdm(t)): ts += x if ts >= m: idx.append(i) ts = 0 continue return idx def dollar_bar_df(df, dv_column, m): idx = dollar_bars(df, dv_column, m) return df.iloc[idx].drop_duplicates() # In[26]: dollar_M = 1_000_000 # arbitrary print(f'dollar threshold: {dollar_M:,}') dv_bar_df = dollar_bar_df(df, 'dv', 'price', dollar_M) cprint(dv_bar_df) # In[27]: xDate = '2009-10-1' xdf, xtdf = select_sample_data(df, dv_bar_df, 'price', xDate) print(f'xdf shape: {xdf.shape}, xtdf shape: {xtdf.shape}') plot_sample_data(xdf, xtdf, 'dollar bar', alpha=0.5, markersize=7) # # Analyzing the Bars # ## Count Quantity of Bars By Each Bar Type (Weekly) # In[28]: def count_bars(df, price_col='price'): return df.groupby(pd.TimeGrouper('1W'))[price_col].count() def scale(s): return (s-s.min())/(s.max()-s.min()) # In[29]: # count series # scale to compare 'apples to apples' tc = scale(count_bars(tick_df)) vc = scale(count_bars(v_bar_df)) dc = scale(count_bars(dv_bar_df)) dfc = scale(count_bars(df)) # In[87]: # plot time series of count f,ax=plt.subplots(figsize=(10,7)) tc.plot(ax=ax, ls='-', label='tick count') vc.plot(ax=ax, ls='--', label='volume count') dc.plot(ax=ax, ls='-.', label='dollar count') ax.set_title('scaled bar counts') ax.legend() # ## Which Bar Type Has Most Stable Counts? # In[116]: print(f'tc std: {tc.std():.2%}, vc std: {vc.std():.2%}, dc std: {dc.std():.2%}') bar_types = ['tick','volume','dollar','df'] bar_std = [tc.std(),vc.std(),dc.std(),dfc.std()] counts = (pd.Series(bar_std,index=bar_types)) counts.sort_values() # ## Which Bar Type Has the Lowest Serial Correlation? # In[89]: def returns(s): arr = np.diff(np.log(s)) return (pd.Series(arr, index=s.index[1:])) # In[117]: tr = returns(tick_df.price) vr = returns(v_bar_df.price) dr = returns(dv_bar_df.price) df_ret = returns(df.price) bar_returns = [tr, vr, dr, df_ret] # In[120]: def get_test_stats(bar_types,bar_returns,test_func,*args,**kwds): dct = {bar:(int(bar_ret.shape[0]), test_func(bar_ret,*args,**kwds)) for bar,bar_ret in zip(bar_types,bar_returns)} df = (pd.DataFrame.from_dict(dct) .rename(index={0:'sample_size',1:f'{test_func.__name__}_stat'}) .T) return df autocorrs = get_test_stats(bar_types,bar_returns,pd.Series.autocorr) display(autocorrs.sort_values('autocorr_stat'), autocorrs.abs().sort_values('autocorr_stat')) # In[92]: def plot_autocorr(bar_types,bar_returns): f,axes=plt.subplots(len(bar_types),figsize=(10,7)) for i, (bar, typ) in enumerate(zip(bar_returns, bar_types)): sm.graphics.tsa.plot_acf(bar, lags=120, ax=axes[i], alpha=0.05, unbiased=True, fft=True, zero=False, title=f'{typ} AutoCorr') plt.tight_layout() def plot_hist(bar_types,bar_rets): f,axes=plt.subplots(len(bar_types),figsize=(10,6)) for i, (bar, typ) in enumerate(zip(bar_returns, bar_types)): g = sns.distplot(bar, ax=axes[i], kde=False, label=typ) g.set(yscale='log') axes[i].legend() plt.tight_layout() # In[93]: plot_autocorr(bar_types,bar_returns) # In[94]: plot_hist(bar_types,bar_returns) # ## Partition Bar Series into Monthly, Compute Variance of Returns, and Variance of Variance # In[95]: def partition_monthly(s): return s.resample('1M').var() # In[118]: tr_rs = partition_monthly(tr) vr_rs = partition_monthly(vr) dr_rs = partition_monthly(dr) df_ret_rs = partition_monthly(df_ret) monthly_vars = [tr_rs, vr_rs, dr_rs, df_ret_rs] # In[119]: get_test_stats(bar_types,monthly_vars,np.var).sort_values('var_stat') # ## Compute Jarque-Bera Test, Which Has Lowest Test Statistic? # In[98]: def jb(x,test=True): np.random.seed(12345678) if test: return stats.jarque_bera(x)[0] return stats.jarque_bera(x)[1] get_test_stats(bar_types,bar_returns,jb).sort_values('jb_stat') # ## Compute Shapiro-Wilk Test # Shapiro-Wilk test statistic > larger is better. # In[99]: def shapiro(x,test=True): np.random.seed(12345678) if test: return stats.shapiro(x)[0] return stats.shapiro(x)[1] (get_test_stats(bar_types,bar_returns,shapiro) .sort_values('shapiro_stat')[::-1]) # # Compare Serial Correlation between Dollar and Dollar Imbalance Bars # ### Update [05.04.18] # # Earlier version was missing some additional code. Before we can compare we must compute the Dollar Imbalance Bar. This is my initial implementation of this concept but is experimental and may need some adjustments. # # 1. Compute the sequence ${bt}_{t=1,...,T}$. # 2. Compute the imbalance at time $T$ defined as $\theta_T = \sum_{t=1}^{T}b_tv_t$. # 3. Compute the expected value of $T$ as ewma of previous $T$ values. # 4. Compute the expected value of $\theta_T$ as ewma of $b_tv_t$ values. # 5. for each index: # - compute $\lvert\theta_t\rvert >= E_0[T] * \lvert2v^+-E_0[v_t]\rvert$ # - if the condition is met capture the quantity of ticks # - reset tick count # - continue # # In[100]: tidx = get_imbalance(df.price.values)*df.dv.iloc[1:] cprint(tidx) # In[101]: wndo = tidx.shape[0]//1000 print(f'window size: {wndo:,.2f}') ## Expected value of bs approximated by ewm E_bs = tidx.ewm(wndo).mean() # expected `bs` ## what is E_T??? ## in this implementation E_T is ewm of index values E_T = pd.Series(range(tidx.shape[0]), index=tidx.index).ewm(wndo).mean() df0 =(pd.DataFrame().assign(bs=tidx) .assign(E_T=E_T).assign(E_bs=E_bs) .assign(absMul=lambda df: df.E_T*np.abs(df.E_bs)) .assign(absTheta=tidx.cumsum().abs())) cprint(df0) # In[102]: df0[['E_T','E_bs']].plot(subplots=True, figsize=(10,6)); # In[103]: display(df0.describe()/1000) # In[122]: (df0.loc['2010-06',['absMul','absTheta']] .reset_index(drop=True) .plot(figsize=(10,5))) # In[105]: def test_t_abs(absTheta,t,E_bs): """ Bool function to test inequality *row is assumed to come from df.itertuples() -absTheta: float(), row.absTheta -t: pd.Timestamp() -E_bs: float(), row.E_bs """ return (absTheta >= t*E_bs) def agg_imbalance_bars(df): """ Implements the accumulation logic """ start = df.index[0] bars = [] for row in df.itertuples(): t_abs = row.absTheta rowIdx = row.Index E_bs = row.E_bs t = df.loc[start:rowIdx].shape[0] if t<1: t=1 # if t lt 1 set equal to 1 if test_t_abs(t_abs,t,E_bs): bars.append((start,rowIdx,t)) start = rowIdx return bars # In[106]: bars = agg_imbalance_bars(df0) test_imb_bars = (pd.DataFrame(bars,columns=['start','stop','Ts']) .drop_duplicates()) cprint(test_imb_bars) # In[107]: test_imb_bars.Ts.describe().round() # In[108]: test_imb_bars.set_index('stop')['Ts'].plot() # In[109]: dvImbBars = df.price.loc[test_imb_bars.stop].drop_duplicates() cprint(dvImbBars) # In[110]: dvBar = dv_bar_df.price cprint(dvBar) # In[111]: dr = returns(dv_bar_df.price) drImb = returns(dvImbBars) # In[112]: bar_types = ['dvBar','dvImb'] bar_rets = [dr, drImb] get_test_stats(bar_types,bar_rets,pd.Series.autocorr) # In[113]: plot_autocorr(bar_types,bar_returns) # In[114]: plot_hist(bar_types,bar_returns) # In[115]: jbs = get_test_stats(bar_types,bar_returns,jb).sort_values('jb_stat') shaps = (get_test_stats(bar_types,bar_returns,shapiro) .sort_values('shapiro_stat')[::-1]) display(jbs,shaps) # In[ ]: