In [1]:
# Prepare environment
import os, sys
sys.path.insert(0, os.path.abspath('..'))
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
from io import StringIO
import pandas as pd
import warnings
warnings.filterwarnings("ignore")
In [2]:
gs1h_csv = StringIO("""
Symbol,DataType,BarSize,TickerTime,opening,high,low,closing,volume,barcount,average
GS,TRADES,1h,2017-09-05 12:00:00+00:00,224.5,224.5,223.98,223.98,23,18,224.302
GS,TRADES,1h,2017-09-05 13:00:00+00:00,224.25,224.25,220.01,220.39,6431,3782,221.423
GS,TRADES,1h,2017-09-05 14:00:00+00:00,220.39,220.7,217.3,218.12,11332,5881,218.82
GS,TRADES,1h,2017-09-05 15:00:00+00:00,218.09,219.64,218.07,219.45,6457,3843,218.795
GS,TRADES,1h,2017-09-05 16:00:00+00:00,219.45,219.46,218.11,218.67,4940,3550,218.633
GS,TRADES,1h,2017-09-05 17:00:00+00:00,218.72,219.19,218.13,218.73,3228,2527,218.657
GS,TRADES,1h,2017-09-05 18:00:00+00:00,218.72,218.86,217.62,217.67,4939,3219,218.285
GS,TRADES,1h,2017-09-05 19:00:00+00:00,217.68,218.3,217.46,217.85,8173,5594,217.747
GS,TRADES,1h,2017-09-05 20:00:00+00:00,217.79,218.85,217.78,217.92,2445,10,217.781
GS,TRADES,1h,2017-09-05 21:00:00+00:00,218.0,218.11,217.91,218.11,8,5,218.022
GS,TRADES,1h,2017-09-05 22:00:00+00:00,218.11,218.17,217.95,217.95,15,11,218.12
GS,TRADES,1h,2017-09-05 23:00:00+00:00,218.15,218.15,217.93,217.93,2,2,218.04
GS,TRADES,1h,2017-09-06 12:00:00+00:00,218.97,219.29,218.97,219.2,28,11,219.159
GS,TRADES,1h,2017-09-06 13:00:00+00:00,219.1,220.78,218.67,219.58,4729,2596,219.796
GS,TRADES,1h,2017-09-06 14:00:00+00:00,219.61,221.02,219.54,220.01,4451,2722,220.49
GS,TRADES,1h,2017-09-06 15:00:00+00:00,219.98,220.2,217.73,218.2,4222,2500,219.02
GS,TRADES,1h,2017-09-06 16:00:00+00:00,218.2,219.83,217.61,219.8,2680,1809,218.335
GS,TRADES,1h,2017-09-06 17:00:00+00:00,219.77,220.5,219.41,219.57,2470,1492,219.954
GS,TRADES,1h,2017-09-06 18:00:00+00:00,219.61,219.8,218.9,219.33,2127,1447,219.428
GS,TRADES,1h,2017-09-06 19:00:00+00:00,219.33,219.7,218.85,219.05,5587,3451,219.363
GS,TRADES,1h,2017-09-06 20:00:00+00:00,218.83,219.09,218.83,218.99,3634,6,218.83
GS,TRADES,1h,2017-09-06 21:00:00+00:00,218.98,218.98,218.98,218.98,2,1,218.98
GS,TRADES,1h,2017-09-06 22:00:00+00:00,218.7,218.7,218.7,218.7,1,1,218.7
GS,TRADES,1h,2017-09-06 23:00:00+00:00,218.69,218.7,218.69,218.7,8,2,218.696
GS,TRADES,1h,2017-09-07 11:00:00+00:00,219.0,219.0,219.0,219.0,1,1,219.0
GS,TRADES,1h,2017-09-07 12:00:00+00:00,219.21,219.4,218.5,218.5,31,16,219.015
GS,TRADES,1h,2017-09-07 13:00:00+00:00,218.57,218.83,216.07,216.31,3338,1726,217.503
GS,TRADES,1h,2017-09-07 14:00:00+00:00,216.35,216.35,214.64,215.77,7048,4299,215.392
GS,TRADES,1h,2017-09-07 15:00:00+00:00,215.74,216.41,214.96,215.28,4571,3190,215.666
GS,TRADES,1h,2017-09-07 16:00:00+00:00,215.24,216.28,215.06,216.07,2191,1541,215.518
GS,TRADES,1h,2017-09-07 17:00:00+00:00,216.04,216.41,215.26,215.6,2058,1495,215.708
GS,TRADES,1h,2017-09-07 18:00:00+00:00,215.58,215.74,215.25,215.37,2206,1509,215.428
GS,TRADES,1h,2017-09-07 19:00:00+00:00,215.4,215.94,214.95,215.83,6582,4149,215.313
GS,TRADES,1h,2017-09-07 20:00:00+00:00,215.84,216.88,215.8,216.02,1869,9,215.846
GS,TRADES,1h,2017-09-07 21:00:00+00:00,215.98,215.98,215.9,215.9,9,6,215.927
GS,TRADES,1h,2017-09-07 22:00:00+00:00,215.9,215.99,215.9,215.99,14,3,215.909
GS,TRADES,1h,2017-09-07 23:00:00+00:00,216.09,216.09,216.09,216.09,1,1,216.09
GS,TRADES,1h,2017-09-08 11:00:00+00:00,215.44,215.44,215.44,215.44,1,1,215.44
GS,TRADES,1h,2017-09-08 12:00:00+00:00,214.9,215.5,214.8,215.5,22,8,215.05
GS,TRADES,1h,2017-09-08 13:00:00+00:00,215.5,218.76,215.1,218.67,5788,3250,217.577
GS,TRADES,1h,2017-09-08 14:00:00+00:00,218.68,219.28,217.78,218.04,4696,3283,218.707
GS,TRADES,1h,2017-09-08 15:00:00+00:00,218.06,218.39,216.82,216.89,2574,1880,217.345
GS,TRADES,1h,2017-09-08 16:00:00+00:00,216.92,217.3,216.67,217.14,2049,1433,217.015
GS,TRADES,1h,2017-09-08 17:00:00+00:00,217.12,217.17,216.15,216.76,2254,1565,216.591
GS,TRADES,1h,2017-09-08 18:00:00+00:00,216.76,217.35,216.61,217.01,1921,1373,217.117
GS,TRADES,1h,2017-09-08 19:00:00+00:00,217.01,217.34,216.69,217.24,3980,2789,217.075
GS,TRADES,1h,2017-09-08 20:00:00+00:00,217.21,217.21,216.71,216.71,2222,6,217.209
GS,TRADES,1h,2017-09-08 21:00:00+00:00,217.0,217.0,217.0,217.0,2,1,217.0
GS,TRADES,1h,2017-09-08 22:00:00+00:00,216.46,216.8,216.46,216.8,3,2,216.698
GS,TRADES,1h,2017-09-08 23:00:00+00:00,216.8,216.8,216.8,216.8,0,0,216.8
""")

gs1h = pd.read_csv(gs1h_csv)

Example starts here


In [3]:
import pytz
import asyncio
import aiomysql.sa as aiosa
from sqlalchemy import create_engine, MetaData
from ibstract import IB
from ibstract import MarketDataBlock
from ibstract import HistDataReq
from ibstract import init_db, insert_hist_data, query_hist_data
from ibstract import download_insert_hist_data
from ibstract import get_hist_data
from ibstract.utils import dtest

init_db() : Initialize MySQL database by creating one table for each security type.

In [4]:
db_info = {
    'host': '127.0.0.1',
    'user': 'root',
    'password': 'ibstract',
    'db': 'ibstract_test',
}
init_db(db_info)
In [5]:
engine = create_engine(
    "mysql+pymysql://{}:{}@{}/{}".format(
        db_info['user'], db_info['password'], db_info['host'], db_info['db']),
    echo=False)
meta = MetaData()
meta.reflect(bind=engine)
print("Tables in Ibstract MySQL database:")
list(meta.tables.keys())
print("Columns in Stock table:")
meta.tables['Stock'].columns.values()
Tables in Ibstract MySQL database:
Out[5]:
['Bond',
 'CFD',
 'Commodity',
 'Forex',
 'Future',
 'FuturesOption',
 'Index',
 'MutualFund',
 'Option',
 'Stock',
 'Warrant']
Columns in Stock table:
Out[5]:
[Column('Symbol', VARCHAR(length=20), table=<Stock>, primary_key=True, nullable=False),
 Column('DataType', VARCHAR(length=20), table=<Stock>, primary_key=True, nullable=False),
 Column('BarSize', VARCHAR(length=10), table=<Stock>, primary_key=True, nullable=False),
 Column('TickerTime', DATETIME(), table=<Stock>, primary_key=True, nullable=False),
 Column('opening', FLOAT(), table=<Stock>),
 Column('high', FLOAT(), table=<Stock>),
 Column('low', FLOAT(), table=<Stock>),
 Column('closing', FLOAT(), table=<Stock>),
 Column('volume', INTEGER(display_width=10, unsigned=True), table=<Stock>),
 Column('barcount', INTEGER(display_width=10, unsigned=True), table=<Stock>),
 Column('average', FLOAT(), table=<Stock>)]

Coroutines insert_hist_data(), query_hist_data() : Insert / Read MarketDataBlock() to/from MySQL database

In [6]:
# The MarketDataBlock() to be inserted.
blk_gs1h = MarketDataBlock(gs1h)
blk_gs1h.tz_convert('US/Eastern')
blk_gs1h.df.head()
Out[6]:
opening high low closing volume barcount average
Symbol DataType BarSize TickerTime
GS TRADES 1h 2017-09-05 08:00:00-04:00 224.50 224.50 223.98 223.98 23 18 224.302
2017-09-05 09:00:00-04:00 224.25 224.25 220.01 220.39 6431 3782 221.423
2017-09-05 10:00:00-04:00 220.39 220.70 217.30 218.12 11332 5881 218.820
2017-09-05 11:00:00-04:00 218.09 219.64 218.07 219.45 6457 3843 218.795
2017-09-05 12:00:00-04:00 219.45 219.46 218.11 218.67 4940 3550 218.633
In [7]:
async def run(loop, blk):
    engine = await aiosa.create_engine(
        user=db_info['user'], db=db_info['db'],
        host=db_info['host'], password=db_info['password'],
        loop=loop, echo=False)
    await insert_hist_data(engine, 'Stock', blk)
    data = await query_hist_data(engine, 'Stock', 'GS', 'TRADES', '1h', 
                                          dtest(2017, 9, 6, 10), dtest(2017, 9, 6, 14),)
    engine.close()
    await engine.wait_closed()
    return data

loop = asyncio.get_event_loop()
blk_readback = loop.run_until_complete(run(loop, blk_gs1h))
blk_readback.df
Out[7]:
opening high low closing volume barcount average
Symbol DataType BarSize TickerTime
GS TRADES 1h 2017-09-06 10:00:00-04:00 219.61 221.02 219.54 220.01 4451 2722 220.490
2017-09-06 11:00:00-04:00 219.98 220.20 217.73 218.20 4222 2500 219.020
2017-09-06 12:00:00-04:00 218.20 219.83 217.61 219.80 2680 1809 218.335
2017-09-06 13:00:00-04:00 219.77 220.50 219.41 219.57 2470 1492 219.954
2017-09-06 14:00:00-04:00 219.61 219.80 218.90 219.33 2127 1447 219.428

Coroutine download_insert_hist_data() : Download historical data and insert part of them to MySQL database

In [8]:
# Clear Stock table
engine.execute(meta.tables['Stock'].delete())

# A user coroutine to download historical data and insert to MySQL database
async def run(loop, req, broker, insert_limit):
    engine = await aiosa.create_engine(
        user=db_info['user'], db=db_info['db'],
        host=db_info['host'], password=db_info['password'],
        loop=loop, echo=False)
    blk_download = await download_insert_hist_data(
        req, broker, engine, insert_limit)
    blk_readback = await query_hist_data(
        engine, req.SecType, req.Symbol, req.DataType, req.BarSize,
        start=dtest(2017, 8, 1), end=dtest(2017, 10, 1))
    engine.close()
    await engine.wait_closed()
    return blk_download, blk_readback

# Arguments
req = HistDataReq('Stock', 'GS', '1d', '5d', dtest(2017, 9, 7))
broker = IB('127.0.0.1', 4002)
insert_limit = (dtest(2017, 8, 31), dtest(2017, 9, 5))  # Only insert partial data between (SQL inclusive) 8/31 and 9/5

# Run loop
loop = asyncio.get_event_loop()
blk_download, blk_readback = loop.run_until_complete(
    run(loop, req, broker, insert_limit))
blk_download.df
blk_readback.df
Out[8]:
<sqlalchemy.engine.result.ResultProxy at 0x10c7c6ba8>
Out[8]:
opening high low closing volume barcount average
Symbol DataType BarSize TickerTime
GS TRADES 1d 2017-08-30 00:00:00-04:00 220.25 224.22 220.09 222.42 18580 12085 222.7730
2017-08-31 00:00:00-04:00 223.25 224.49 222.58 223.74 15491 10053 223.7635
2017-09-01 00:00:00-04:00 224.55 227.56 223.53 225.88 16940 11739 226.3505
2017-09-05 00:00:00-04:00 223.85 224.00 217.30 217.78 45499 28392 218.9010
2017-09-06 00:00:00-04:00 218.98 221.02 217.61 218.83 26158 15960 219.5335
Out[8]:
opening high low closing volume barcount average
Symbol DataType BarSize TickerTime
GS TRADES 1d 2017-08-31 00:00:00-04:00 223.25 224.49 222.58 223.74 15491 10053 223.764
2017-09-01 00:00:00-04:00 224.55 227.56 223.53 225.88 16940 11739 226.350
2017-09-05 00:00:00-04:00 223.85 224.00 217.30 217.78 45499 28392 218.901

Coroutine get_hist_data() : Try to query data from local MySQL database. If any part is not found, download and insert back to MySQL all missing parts concurrently and asynchronously.

get_hist_data() executes in these steps:

  1. Try to query historical data from local MySQL for the input user request.
  2. Determine missing data parts and corresponding start-end date/time gaps. Create multiple HistDataReq() requests for these gaps.
  3. Concurrently download these requests from broker API.
  4. Concurrently combine downloaded data pieces with the data from local MySQL data block.
  5. Return the combined data per the input user request.
In [9]:
# Use data above
blk_gs_3days = blk_readback

# Clear Stock table
engine.execute(meta.tables['Stock'].delete())

# Populate database with some data
async def populate_db(blk_db):
    engine = await aiosa.create_engine(
        user=db_info['user'], db=db_info['db'],
        host=db_info['host'], password=db_info['password'],
        loop=loop, echo=False)
    await insert_hist_data(engine, 'Stock', blk_db)
    data_exist_in_db = await query_hist_data(engine, 'Stock', 'GS', 'TRADES', '1d',
                                            dtest(2017,1,1),dtest(2017,12,31))
    engine.close()
    await engine.wait_closed()
    return data_exist_in_db

# Insert and query local MySQL database
loop = asyncio.get_event_loop()
blk_db = loop.run_until_complete(populate_db(blk_gs_3days))
blk_db.df
Out[9]:
<sqlalchemy.engine.result.ResultProxy at 0x10c759eb8>
Out[9]:
opening high low closing volume barcount average
Symbol DataType BarSize TickerTime
GS TRADES 1d 2017-08-31 00:00:00-04:00 223.25 224.49 222.58 223.74 15491 10053 223.764
2017-09-01 00:00:00-04:00 224.55 227.56 223.53 225.88 16940 11739 226.350
2017-09-05 00:00:00-04:00 223.85 224.00 217.30 217.78 45499 28392 218.901
In [10]:
# A user coroutine to get wider range of historical data than those existing in MySQL.
# Data existing in MySQL will not be downloaded, but combined with downloaded data.
async def run(req, broker, loop):
    blk_ret = await get_hist_data(req, broker, mysql={**db_info, 'loop': loop})
    return blk_ret

# Request daily data of 8 days, from 8/29 - 9/8.
# Data from 8/31 - 9/5 exist in local database and will not be downloaded.
req = HistDataReq('Stock', 'GS', '1d', '8d', dtest(2017, 9, 9))
broker = IB('127.0.0.1', 4002)

loop = asyncio.get_event_loop()
blk_ret = loop.run_until_complete(run(req, broker, loop))
blk_ret.df
Out[10]:
opening high low closing volume barcount average
Symbol DataType BarSize TickerTime
GS TRADES 1d 2017-08-29 00:00:00-04:00 217.27 220.14 215.75 219.96 18795 12617 218.7545
2017-08-30 00:00:00-04:00 220.25 224.22 220.09 222.42 18580 12085 222.7730
2017-08-31 00:00:00-04:00 223.25 224.49 222.58 223.74 15491 10053 223.7635
2017-09-01 00:00:00-04:00 224.55 227.56 223.53 225.88 16940 11739 226.3505
2017-09-05 00:00:00-04:00 223.85 224.00 217.30 217.78 45499 28392 218.9010
2017-09-06 00:00:00-04:00 218.98 221.02 217.61 218.83 26158 15960 219.5335
2017-09-07 00:00:00-04:00 218.73 218.81 214.64 215.84 27963 17892 215.7020
2017-09-08 00:00:00-04:00 215.51 219.28 215.40 217.21 23250 15562 217.5120