# -*- coding: utf-8 -*-
%matplotlib notebook
from matplotlib import pyplot as plt
from matplotlib import animation
import numpy as np
import aiohttp
import asyncio
from aiohttp import WSMsgType
import json
from collections import deque
import traceback
from datetime import datetime
from pytz import timezone, utc
class RealTimeChart:
def __init__(self):
self.length = 105 # チャートの期間
# キューの宣言
self.dt = deque(maxlen=self.length) # x軸(時刻)
self.ltp = deque(maxlen=self.length) # y軸(LTP)
self.ma = deque(maxlen=self.length) # y軸(移動平均)
self.bid = deque(maxlen=self.length) # y軸(最良気配値BID)
self.ask = deque(maxlen=self.length) # y軸(最良気配値ASK)
# Figure生成
self.fig, self.ax = plt.subplots(1,1)
self.np_x = np.array([])
# Line更新
self.im_ltp = None # LTP
self.im_bid = None # 最良気配値BID
self.im_ask = None # 最良気配値ASK
self.im_ma = None # 移動平均
# アニメーション生成
ani = animation.FuncAnimation(self.fig, self.update_plot, init_func=self.init_plot,
interval=20, blit=True)
# イベントループ
loop = asyncio.get_event_loop()
tasks = asyncio.wait([self.receive_from_rpc()]) # タスクの設定
try:
loop.run_until_complete(tasks) # イベントループ開始
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
# plot表示
plt.show()
# ティッカーの購読[Twitter→@crptoresearcherさんの関数を引用]
async def receive_from_rpc(self):
# URL クエリパラメータの設定
uri = 'wss://ws.lightstream.bitflyer.com/json-rpc'
query = {'method': 'subscribe',
'params': {'channel': 'lightning_ticker_FX_BTC_JPY'}}
while True:
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(uri,
receive_timeout=10) as client:
await asyncio.wait([client.send_str(json.dumps(query))])
async for response in client:
if response.type != WSMsgType.TEXT:
print('response:' + str(response))
break
data = json.loads(response[1])['params']['message']
# レスポンスデータをキューへ格納
self.dt.append(data['timestamp']) # 時刻
self.ltp.append(int(data['ltp'])) # LTP
self.bid.append(int(data['best_bid'])) # 最良気配値BID
self.ask.append(int(data['best_ask'])) # 最良気配値ASK
self.ma.append(int(np.mean(self.ltp))) # 移動平均
self.np_x = np.array(range(len(self.ltp)))
except Exception as e:
traceback.print_exc()
# アニメーション前処理
def init_plot(self):
plt.ion()
plt.cla() # チャートを初期化
plt.title('FXBTC/JPY') # グラフタイトル
plt.subplots_adjust(left=0.1, right=0.95,
bottom=0.1, top=0.95) # スペース
self.ax.tick_params(labelsize=10) # 軸目盛ラベルサイズ
self.ax.xaxis.grid(True, which = 'major', linestyle = '-', color = '#CFCFCF') # x軸グリッド
self.ax.yaxis.grid(True, which = 'major', linestyle = '-', color = '#CFCFCF') # y軸グリッド
self.ax.set_axisbelow(True) # グラフのグリッドがプロットした点や線の下に隠れる
# 空のplot生成
self.im_ltp, = self.ax.plot([], [], color='blue',
linewidth = 2.0, linestyle='solid') # LTP
self.im_bid, = self.ax.plot([], [], color='magenta',
linewidth = 1.0, linestyle='dashed') # 最良気配値BID
self.im_ask, = self.ax.plot([], [], color='green',
linewidth = 1.0, linestyle='dashed') # 最良気配値ASK
self.im_ma, = self.ax.plot([], [], color='red',
linewidth = 1.0, linestyle='solid') # 移動平均
# アニメーション描画更新
def update_plot(self, i):
# plotデータ更新
self.im_ltp.set_data(self.np_x, self.ltp) # LTP
self.im_bid.set_data(self.np_x, self.bid) # 最良気配値BID
self.im_ask.set_data(self.np_x, self.ask) # 最良気配値ASK
self.im_ma.set_data(self.np_x, self.ma) # 移動平均
self.ax.relim()
self.ax.autoscale_view()
# 区切り間隔からX軸目盛設定
if len(self.dt) == self.length:
lst_dt = list(self.dt)
unit = 20 # 目盛間隔
lst_dt = lst_dt[::unit]
plt.xticks(range(0, self.length, unit), # 位置配列
[self.str_to_datetime(x[:19]).strftime("%H:%M:%S") for x in lst_dt]) # ラベル配列
# 日付文字列->datetime(JST)
def str_to_datetime(self, str_dt):
dt = None
try:
dt = datetime.strptime(str_dt, "%Y-%m-%dT%H:%M:%S.%f")
dt_utc = utc.localize(dt)
dt_jst = dt_utc.astimezone(timezone("Asia/Tokyo"))
except ValueError:
try:
dt = datetime.strptime(str_dt, "%Y-%m-%dT%H:%M:%S")
dt_utc = utc.localize(dt)
dt_jst = dt_utc.astimezone(timezone("Asia/Tokyo"))
except ValueError:
print("timestamp str convert error!")
return None
return dt_jst
if __name__ == '__main__':
RealTimeChart()