#概要
[Python Websocket JSON-RPC OHLCV] bitFlyer のTicker情報からオレオレOHLCデータを作成する
https://note.mu/17num/n/n6417eb8157bc
[仮想通貨取引所]BitlflyerのRealtimeAPIをPythonで呼び出す
https://qiita.com/Avocado/items/55a0dd6aed0d2bbf7b94
ほぼ上記サイトをあわせただけですがコピペのみでは動かなかったため防備録としてメモを残す
#pip install
pip install websocket-client==0.47.0 --user
を実行。最新バージョンだと動かないようです。
("pip install websocket-client"だけでは動かない)
上記サイトのままだとなぜか(バージョンの問題?)
TypeError: 'module' object is not callable
が出たので、名前空間を解決
#実装
import json
import websocket
from datetime import datetime, timedelta
import dateutil.parser
from time import sleep
from logging import getLogger,INFO,StreamHandler
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(INFO)
logger.setLevel(INFO)
logger.addHandler(handler)
def get_timestamp(d):
timestamp = d["timestamp"].replace('T', ' ')[:-1]
return dateutil.parser.parse(timestamp) + timedelta(hours=9)
"""
This program calls Bitflyer real time API JSON-RPC2.0 over Websocket
"""
class RealtimeAPI(object):
def __init__(self, url, channel):
self.url = url
self.channel = channel
self.ohlc = {}
self.ohlc["date"] = ""
#Define Websocket
self.ws = websocket.WebSocketApp(self.url, header=None, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
websocket.enableTrace(True)
def run(self):
#ws has loop. To break this press ctrl + c to occur Keyboard Interruption Exception.
self.ws.run_forever()
logger.info('Web Socket process ended.')
"""
Below are callback functions of websocket.
"""
# when we get message
def on_message(self, ws, message):
output = json.loads(message)['params']
now = datetime.now()
ts = get_timestamp(output["message"])
# 約定データが現在時刻より過去の場合は捨てる
if(datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second) < datetime(now.year, now.month, now.day, now.hour, now.minute, now.second)):
return
price = float(output["message"]["ltp"])
# OHLC データの時刻が更新された場合
if(self.ohlc["date"] != ts.strftime("%Y-%m-%d %H:%M:%S")):
if(self.ohlc["date"] != ""):
logger.info("{}, {}, {}, {}, {}, {}".format(now, self.ohlc["date"], self.ohlc["open"], self.ohlc["high"], self.ohlc["low"], self.ohlc["close"]))
self.ohlc["date"] = ts.strftime("%Y-%m-%d %H:%M:00")
self.ohlc["open"] = price
self.ohlc["high"] = price
self.ohlc["low"] = price
self.ohlc["close"] = price
# OHLC データの時刻が同じ場合
else:
self.ohlc["high"] = max(self.ohlc["high"], price)
self.ohlc["low"] = min(self.ohlc["low"], price)
self.ohlc["close"] = price
# when error occurs
def on_error(self, ws, error):
logger.error(error)
# when websocket closed.
def on_close(self, ws):
logger.info('disconnected streaming server')
# when websocket opened.
def on_open(self, ws):
logger.info('connected streaming server')
output_json = json.dumps(
{'method' : 'subscribe',
'params' : {'channel' : self.channel}
}
)
ws.send(output_json)
if __name__ == '__main__':
#API endpoint
url = 'wss://ws.lightstream.bitflyer.com/json-rpc'
channel = 'lightning_ticker_FX_BTC_JPY' # Ticker
json_rpc = RealtimeAPI(url=url, channel=channel)
#ctrl + cで終了
json_rpc.run()