# -*- coding: utf-8 -*-
import time, calendar, pytz, requests, math
from datetime import datetime, timedelta
from collections import OrderedDict
import numpy as np
import pandas as pd
import mpl_finance as mpf
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from IPython.display import display, clear_output
from ipywidgets import interact, IntSlider, Dropdown
%matplotlib inline
plt.ion()
ALLOWED_PERIOD = {
"1m": ["1m", 1, 1], "3m": ["1m", 3, 3],
"5m": ["5m", 1, 5], "15m": ["5m", 3, 15], "30m": ["5m", 6, 30],
"1h": ["1h", 1, 60], "2h": ["1h", 2, 120],
"3h": ["1h", 3, 180], "4h": ["1h", 4, 240],
"6h": ["1h", 6, 360], "12h": ["1h", 12, 720],
"1d": ["1d", 1, 1440],
# not support yet '3d', '1w', '2w', '1m'
}
bar = "Candle" # チャートタイプ(ロウソク足/平均足)
period = "1h" # 時間足
chart_bars = 100 # チャートバー表示件数
sma1_term = 9 # SMA1期間
sma2_term = 36 # SMA2期間
sma3_term = 52 # SMA3期間
df_base = pd.DataFrame(index=[], columns=[]) # OHLCV + RCI
df_chart = pd.DataFrame(index=[], columns=[]) # OHLCV + RCI + Regression
fig = None
ax1 = None
ax2 = None
ax3 = None
bg1 = None
#---------------------------------------------------------------------
# メイン処理
#---------------------------------------------------------------------
def main():
global period
global df_base
# 初回チャート作成
createChart()
# OHLCVデータ+平均足取得
df_base = fetch_ohlcv_df(period=period, count=360, reverse=False, partial=True, \
tstype="UTS", heikin=True)
# index設定
df_base = df_base.set_index("timestamp")
# RCI取得
attachRCI()
# SMA取得
attachSMA()
# チャート更新
updateChart()
#---------------------------------------------------------------------
# チャート作成
#---------------------------------------------------------------------
def createChart():
global fig
global ax1
global ax2
global ax3
global bg1
# 描画領域を作成
# Figure(図全体)
fig = plt.figure(figsize =(10, 8), # 領域サイズ(インチ指定)
dpi =100, # 解像度
facecolor ="#1E1E1E",# 背景色
edgecolor ="k") # 外枠色
fig.autofmt_xdate() # x軸のオートフォーマット
# チャート配置割り
gs = gridspec.GridSpec(4, 1)
# チャート間隔
plt.subplots_adjust(wspace=0.0, hspace=0.0)
# アスペクト比
plt.gca().set_aspect("equal", adjustable="box")
# ax1:メインチャート(ロウソクバー)
ax1 = plt.subplot(gs[0:3, 0])
# ax2:メインチャー(出来高)
ax2 = ax1.twinx()
# ax3:サブチャート
ax3 = plt.subplot(gs[3, 0], sharex=ax1)
#---------------------------------------------------------------------
# RCI取得
#---------------------------------------------------------------------
def attachRCI():
global bar
global df_base
lst_ts = df_base.index.tolist()
lst_close = df_base["close"].tolist() if bar == "Candle" else df_base["heikin_close"].tolist()
lst_ts = lst_ts[::-1]
lst_close = lst_close[::-1]
df_rci = get_rci_df(lst_ts, lst_close, 300)
# index設定
df_rci = df_rci.set_index("timestamp")
# OHLCVとRCIをマージ(外部結合)
df_base = df_base.join(df_rci, how="outer")
# 欠損値補間(全体0補間)
df_base.fillna(0, inplace=True)
#---------------------------------------------------------------------
# SMA取得
#---------------------------------------------------------------------
def attachSMA():
global bar
global sma1_term
global sma2_term
global sma3_term
global df_base
global df_chart
sr_close = df_base["close"] if bar == "Candle" else df_base["heikin_close"]
df_chart = df_base.copy()
df_chart["SMA1"] = sr_close.rolling(window=sma1_term, min_periods=1).mean()
df_chart["SMA2"] = sr_close.rolling(window=sma2_term, min_periods=1).mean()
df_chart["SMA3"] = sr_close.rolling(window=sma3_term, min_periods=1).mean()
#---------------------------------------------------------------------
# チャート更新
#---------------------------------------------------------------------
def updateChart():
global bar
global chart_bars
global df_chart
global fig
global ax1
global ax2
global ax3
global bg1
# date追加
df_chart["date"] = pd.to_datetime(df_chart.index, unit="s")
# チャート表示件数にトリミング
shift = (chart_bars * -1) - 1
df_chart = df_chart.iloc[shift:]
# index解除
df_chart.reset_index(inplace=True)
# 描画クリア
ax1.cla()
ax2.cla()
ax3.cla()
clear_output(wait=True)
# ax1:メインチャート
ax1.patch.set_facecolor("#131722") # subplotの背景色
ax1.patch.set_alpha(1.0) # subplotの背景透明度
ax1.xaxis.label.set_color("white") # x軸ラベルのテキスト色
ax1.yaxis.label.set_color("white") # y軸ラベルのテキスト色
ax1.spines["top"].set_color("#555555") # プロットエリアを囲む枠線の色Top
ax1.spines["bottom"].set_color("#555555") # プロットエリアを囲む枠線の色Bottom
ax1.spines["left"].set_color("#555555") # プロットエリアを囲む枠線の色Left
ax1.spines["right"].set_color("#555555") # プロットエリアを囲む枠線の色Right
ax1.tick_params(labelbottom=False, bottom=False) # x軸非表示
#ax1.tick_params(axis="x", labelsize=16, colors="white") # x 座標目盛り色
ax1.tick_params(axis="y", labelsize=8, colors="white") # y 座標目盛り色
ax1.xaxis.grid(True, which="major", linestyle="dotted", color="#CFCFCF") # x軸に垂直なグリッドメソッド
ax1.yaxis.grid(True, which="major", linestyle="dotted", color="#CFCFCF") # y軸に垂直なグリッドメソッド
ax1.set_axisbelow(True) # グリッドがプロットした点や線の下に隠れる
#ax1.set_title("BTC/USD", fontsize=20) # タイトル設定
#ax1.set_xlabel("Date", fontsize=20) # xラベル設定
ax1.set_ylabel("USD", fontsize=10) # yラベル設定
ax1.set_xlim(-1, len(df_chart["date"])) # x軸の範囲
# 出来高チャートは下側25%に収める
ax2.set_ylim([0, df_chart["volume"].max() * 4])
ax2.set_ylabel("Volume", fontsize=10)
ax2.tick_params(axis="y", labelsize=8, colors="white") # y座標目盛り
ax2.yaxis.label.set_color("white") # y軸ラベルのテキスト色
# ax3:サブチャート
ax3.patch.set_facecolor("#131722") # subplotの背景色
ax3.patch.set_alpha(1.0) # subplotの背景透明度
ax3.xaxis.label.set_color("white") # x軸ラベルのテキスト色
ax3.yaxis.label.set_color("white") # y軸ラベルのテキスト色
ax3.spines["top"].set_color("#555555") # プロットエリアを囲む枠線の色Top
ax3.spines["bottom"].set_color("#555555") # プロットエリアを囲む枠線の色Bottom
ax3.spines["left"].set_color("#555555") # プロットエリアを囲む枠線の色Left
ax3.spines["right"].set_color("#555555") # プロットエリアを囲む枠線の色Right
ax3.tick_params(axis="x", labelsize=8, colors="white") # x 座標目盛り色
ax3.tick_params(axis="y", labelsize=8, colors="white") # y 座標目盛り色
ax3.xaxis.grid(True, which="major", linestyle="dotted", color="#CFCFCF") # x軸に垂直なグリッドメソッド
ax3.yaxis.grid(True, which="major", linestyle="dotted", color="#CFCFCF") # y軸に垂直なグリッドメソッド
ax3.set_axisbelow(True) # グリッドがプロットした点や線の下に隠れる
ax3.set_xlabel("Date", fontsize=10) # xラベル設定
ax3.set_ylabel("RCI", fontsize=10) # yラベル設定
# 最初の区切り目盛りのインデックス
unit_x = 12 # X軸目盛り区切り間隔
xtick0 = (unit_x - df_chart["date"][0].hour % unit_x)
ax3.set_xticks(range(xtick0, len(df_chart["date"]), unit_x))
ax3.set_xticklabels([x.strftime("%m/%d %H:%M") for x in df_chart["date"]][xtick0::unit_x], rotation=30)
# ロウソクチャート描画
sr_open = df_chart["open"] if bar == "Candle" else df_chart["heikin_open"]
sr_close = df_chart["close"] if bar == "Candle" else df_chart["heikin_close"]
color_up = "#53B987" if bar == "Candle" else "cyan"
color_down = "#EB4D5C" if bar == "Candle" else "red"
mpf.candlestick2_ohlc(ax1,
opens = sr_open, # 始値
highs = df_chart["high"], # 高値
lows = df_chart["low"], # 安値
closes = sr_close, # 終値
width = 0.8, # バー横幅
colorup = color_up, # 陽線色
colordown = color_down) # 陰線色
# 出来高グラフ描画
mpf.volume_overlay(ax2,
opens = sr_open,
closes = sr_close,
volumes = df_chart["volume"],
width = 1.0,
colorup = color_up,
colordown = color_down,
alpha = 0.3)
# SMA描画
date_x = pd.Series(range(len(df_chart.index)))
ax1.plot(date_x, df_chart["SMA1"], color="Red", linewidth="0.5")
ax1.plot(date_x, df_chart["SMA2"], color="green", linewidth="0.5")
ax1.plot(date_x, df_chart["SMA3"], color="cyan", linewidth="0.5")
# y軸調整
ymin = df_chart["low"].min()
ymax = df_chart["high"].max()
slide = (ymax - ymin) / 5
ax1.set_ylim([ymin-slide, ymax+10])
# RCI描画
ax3.plot(date_x, df_chart["RCI9"], color="Red", linewidth="1.0")
ax3.plot(date_x, df_chart["RCI36"], color="Green", linewidth="1.0")
ax3.plot(date_x, df_chart["RCI52"], color="cyan", linewidth="1.0")
ax3.set_ylim([-105, 105])
left, right = ax3.get_xlim()
ax3.hlines([-80, 80], left, right, "white", alpha=0.5, linestyles="dashed")
#---------------------------------------------------------------------
# 指定された時間足のOHLCVデータをBitMEX REST APIより取得
#---------------------------------------------------------------------
# [@param]
# period ="1m" 時間足(1m, 3m, 5m, 15m, 30m, 1h, 2h, 3h, 4h, 6h, 12h, 1d)
# symbol ="XBTUSD" 通貨ペア
# count =1000 取得期間 ※
# reverse =True 並び順(True:新->古, False:古->新)
# partial =False 最新未確定足を取得するか(True:含む, False:含まない)
# tstype ="UTMS" 時刻形式(以下の5パターン)
# "UTMS":UnixTime(ミリ秒), "UTS":UnixTime(秒), "DT":datetime,
# "STMS":日付文字列(%Y-%m-%dT%H:%M:%S.%fZ), "STS":日付文字列(%Y-%m-%dT%H:%M:%S)
# heikin =False 平均足(始値/終値)を取得するか(True:含む, False:含まない)
# [return]
# DataFrame : [[timestamp, open, high, low, close, volume], ・・・]
# List : [[timestamp, open, high, low, close, volume], ・・・]
# ※戻り値データ型 (List or DataFrame) によって取得関数を2パターンに分けてある
#---------------------------------------------------------------------
# ※ 1m, 5m, 1h, 1dを基準にmergeしているため、それ以外を取得する場合は件数に注意
# (3m : 1m * 3期間より、3mを10期間取得するなら、APIでは 10 * 3 = 30件になる)
# APIは1回でMAX10000件までの取得としている
# 30mや4h、6h、12hなどを取得する場合、件数を大きくするとAPI制限にかかる可能性がある
#---------------------------------------------------------------------
# [usage] lst_ohlcv = fetch_ohlcv_lst(period="15m", count=1000, tstype="DT", heikin=True)
# df_ohlcv = fetch_ohlcv_df(period="5m", count=1000, partial=True)
#---------------------------------------------------------------------
# ListでOHLCVを取得
def fetch_ohlcv_lst(period="1m", symbol="XBTUSD", count=1000, reverse=True, partial=False, tstype="UTMS", heikin=False):
df = fetch_ohlcv_df(period, symbol, count, reverse, partial, tstype, heikin)
if df is None:
return []
if heikin:
return [[int(x[0]) if tstype=="UTS" or tstype=="UTMS" else x[0], x[1], x[2], x[3], x[4], int(x[5]), x[6], x[7]] for x in df.values.tolist()]
else:
return [[int(x[0]) if tstype=="UTS" or tstype=="UTMS" else x[0], x[1], x[2], x[3], x[4], int(x[5])] for x in df.values.tolist()]
# DataFrameでOHLCVを取得
def fetch_ohlcv_df(period="1m", symbol="XBTUSD", count=1000, reverse=True, partial=False, tstype="UTMS", heikin=False):
if period not in ALLOWED_PERIOD:
return pd.DataFrame(index=[], columns=self.OHLCV_COLUMNS)
period_params = ALLOWED_PERIOD[period]
need_count = (count + 2) * period_params[1] # マージ状況により、不足が発生する可能性があるため、多めに取得
# REST APIリクエストでOHLCVデータ取得
df_ohlcv = __get_ohlcv_paged(symbol=symbol, period=period_params[0], count=need_count)
# DataFrame化して指定時間にリサンプリング
if period_params[1] > 1:
minutes = ALLOWED_PERIOD[period][2]
offset = str(minutes) + "T"
if 60 <= minutes < 1440:
offset = str(minutes / 60) + "H"
elif 1440 <= minutes:
offset = str(minutes / 1440) + "D"
df_ohlcv = df_ohlcv.resample(offset).agg({
"timestamp": "first",
"open": "first",
"high": "max",
"low": "min",
"close": "last",
"volume": "sum",
})
# 未確定の最新足を除去
if partial == False:
df_ohlcv = df_ohlcv.iloc[:-1]
# マージした結果、余分に取得している場合、古い足から除去
if len(df_ohlcv) > count:
df_ohlcv = df_ohlcv.iloc[len(df_ohlcv)-count:]
# index解除
df_ohlcv.reset_index(inplace=True)
# 平均足付加
if heikin == True:
__attach_ohlcv_heiken_oc(df_ohlcv)
# timestampを期間終わり時刻にするため、datetimeをシフト
df_ohlcv["datetime"] += timedelta(minutes=ALLOWED_PERIOD[period][2])
# timestamp変換
__convert_timestamp(df_ohlcv, tstype)
# datetime列を削除
df_ohlcv.drop("datetime", axis=1, inplace=True)
# 並び順を反転
if reverse == True:
df_ohlcv = df_ohlcv.iloc[::-1]
# indexリセット
df_ohlcv.reset_index(inplace=True, drop=True)
return df_ohlcv
# private
def __convert_timestamp(df_ohlcv, timestamp="UTMS"):
if timestamp == "UTS":
df_ohlcv["timestamp"] = pd.Series([int(dt.timestamp()) for dt in df_ohlcv["datetime"]])
elif timestamp == "UTMS":
df_ohlcv["timestamp"] = pd.Series([int(dt.timestamp()) * 1000 for dt in df_ohlcv["datetime"]])
elif timestamp == "DT":
df_ohlcv["timestamp"] = df_ohlcv["datetime"]
elif timestamp == "STS":
df_ohlcv["timestamp"] = pd.Series([dt.strftime("%Y-%m-%dT%H:%M:%S") for dt in df_ohlcv["datetime"]])
elif timestamp == "STMS":
df_ohlcv["timestamp"] = pd.Series([dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ") for dt in df_ohlcv["datetime"]])
else:
df_ohlcv["timestamp"] = df_ohlcv["datetime"]
def __get_ohlcv_paged(symbol="XBTUSD", period="1m", count=1000):
ohlcv_list = []
utc_now = datetime.now(pytz.utc)
to_time = int(utc_now.timestamp())
from_time = to_time - ALLOWED_PERIOD[period][2] * 60 * count
start = from_time
end = to_time
if count > 10000:
end = from_time + ALLOWED_PERIOD[period][2] * 60 * 10000
while start <= to_time:
ohlcv_list += __fetch_ohlcv_list(symbol=symbol, period=period, start=start, end=end)
start = end + ALLOWED_PERIOD[period][2] * 60
end = start + ALLOWED_PERIOD[period][2] * 60 * 10000
if end > to_time:
end = to_time
df_ohlcv = pd.DataFrame(ohlcv_list,
columns=["timestamp", "open", "high", "low", "close", "volume"])
df_ohlcv["datetime"] = pd.to_datetime(df_ohlcv["timestamp"], unit="s")
df_ohlcv = df_ohlcv.set_index("datetime")
df_ohlcv.index = df_ohlcv.index.tz_localize("UTC")
return df_ohlcv
def __fetch_ohlcv_list(symbol="XBTUSD", period="1m", start=0, end=0):
param = {"period": ALLOWED_PERIOD[period][2], "from": start, "to": end}
url = "https://www.bitmex.com/api/udf/history?symbol=XBTUSD&resolution={period}&from={from}&to={to}".format(**param)
res = requests.get(url)
data = res.json()
return [list(ohlcv) for ohlcv in zip(data["t"], data["o"], data["h"], data["l"], data["c"], data["v"])]
def __attach_ohlcv_heiken_oc(df_ohlcv):
# 始値 = (一本前の始値(平均足) + 一本前の終値(平均足)) / 2
# 終値 = (始値(ローソク足) + 高値(ローソク足) + 安値(ローソク足) + 終値(ローソク足)) / 4
d,t,o,h,l,c,v,ho,hc = range(9)
df_ohlcv["heikin_open"] = float(0.0)
df_ohlcv["heikin_close"] = float(0.0)
df_ohlcv.iloc[0, ho] = df_ohlcv.iloc[0, o]
df_ohlcv.iloc[0, hc] = df_ohlcv.iloc[0, c]
df_ohlcv.iloc[1, ho] = df_ohlcv.iloc[0, o:v].mean()
df_ohlcv.iloc[1, hc] = df_ohlcv.iloc[1, o:v].mean()
for i, ohlcv in df_ohlcv.iloc[2:].iterrows():
df_ohlcv.iloc[i, ho] = (df_ohlcv.iloc[i-1, ho] + df_ohlcv.iloc[i-1, hc]) / 2.0
df_ohlcv.iloc[i, hc] = ohlcv[o:v].mean()
#---------------------------------------------------------------------
# closeリストからRCI3Lineを期間分取得
#---------------------------------------------------------------------
# [@param]
# lst_ts timestamp(UnitxTime[秒])リスト(新->古)
# lst_close 終値リスト(新->古)
# term 計算する期間
# [return]
# RCI3Line(直近3期間) DataFrame
#---------------------------------------------------------------------
# [usage] rci = get_rci_df(lst_close, 100)
#---------------------------------------------------------------------
def get_rci_df(lst_ts, lst_close, term):
# 空のDataFrame作成
df = pd.DataFrame(index=[], columns=["timestamp", "RCI9", "RCI36", "RCI52"])
for i in range(term):
# RCIリスト作成
series = pd.Series([lst_ts[i],
get_rci(lst_close[i:], 9),
get_rci(lst_close[i:], 36),
get_rci(lst_close[i:], 52)], index=df.columns)
# RCIのDataFrameに追加
df = df.append(series, ignore_index=True)
# index反転(古->新)
df_rci = df.iloc[::-1]
# indexリセット
df_rci.reset_index(inplace=True, drop=True)
return df_rci
#---------------------------------------------------------------------
# RCI取得
#---------------------------------------------------------------------
# [@param]
# lst_close 終値リスト(新->古)
# period 計算する期間
# [return]
# RCI
#---------------------------------------------------------------------
# [usage] rci = get_rci(lst_close, period=9)
#---------------------------------------------------------------------
def get_rci(lst_close, period):
close_prices_sorted = lst_close[:period]
close_prices_sorted.sort(reverse=True)
d = 0.0
for i in range(period):
price = lst_close[i]
index = close_prices_sorted.index(price)
count = close_prices_sorted.count(price)
price_index = index + 1 + (count - 1) / 2.0 # if prices are duplicated
d += ( i + 1 - price_index ) ** 2
return round((1.0 - 6.0 * d / (period * (period ** 2 - 1.0))) * 100.0, 1)
if __name__ == "__main__":
main()
# interact object
barDropdown = Dropdown(options=["Candle", "Heikin"], value="Candle")
periodDropdown = Dropdown(options=ALLOWED_PERIOD.keys(), value="1h")
barsSlider = IntSlider(min=60, max=300, step=20, value=100)
sma1Slider = IntSlider(min=1, max=100, step=1, value=9)
sma2Slider = IntSlider(min=1, max=100, step=1, value=36)
sma3Slider = IntSlider(min=1, max=100, step=1, value=52)
# interactイベント
@interact(Bar=barDropdown,
Period=periodDropdown,
Bars=barsSlider,
SMA1=sma1Slider,
SMA2=sma2Slider,
SMA3=sma3Slider)
def interact_event(Bar, Period, Bars, SMA1, SMA2, SMA3):
global bar
global period
global chart_bars
global sma1_term
global sma2_term
global sma3_term
global df_base
global fig
update_mode = 0
if sma1_term != SMA1:
sma1_term = SMA1
update_mode = 1
if sma2_term != SMA2:
sma2_term = SMA2
update_mode = 1
if sma3_term != SMA3:
sma3_term = SMA3
update_mode = 1
if bar != Bar:
bar = Bar
update_mode = 2
if period != Period:
period = Period
update_mode = 2
if chart_bars != Bars:
chart_bars = Bars
update_mode = 2
if update_mode == 0:
return
if update_mode > 1:
# OHLCVデータ+平均足取得
df_base = fetch_ohlcv_df(period=period, count=360, reverse=False, partial=True, \
tstype="UTS", heikin=True)
# index設定
df_base = df_base.set_index("timestamp")
# RCI取得
attachRCI()
if update_mode > 0:
# SMA取得
attachSMA()
# チャート更新
updateChart()
# チャート表示
display(fig)
Failed to display Jupyter Widget of type interactive
.
If you're reading this message in the Jupyter Notebook or JupyterLab Notebook, it may mean that the widgets JavaScript is still loading. If this message persists, it likely means that the widgets JavaScript library is either not installed or not enabled. See the Jupyter Widgets Documentation for setup instructions.
If you're reading this message in another frontend (for example, a static rendering on GitHub or NBViewer), it may mean that your frontend doesn't currently support widgets.