LoginSignup
9
12

More than 5 years have passed since last update.

websocket を用いて bitFlyer の Ticker 情報からOHLCを表示する

Posted at

概要

[Python Websocket JSON-RPC OHLCV] bitFlyer のTicker情報からオレオレOHLCデータを作成する
https://note.mu/17num/n/n6417eb8157bc

[仮想通貨取引所]BitlflyerのRealtimeAPIをPythonで呼び出す
https://qiita.com/Avocado/items/55a0dd6aed0d2bbf7b94

ほぼ上記サイトをあわせただけですがコピペのみでは動かなかったため防備録としてメモを残す

pip install

pip install websocket-client==0.47.0 --user
を実行。最新バージョンだと動かないようです。
("pip install websocket-client"だけでは動かない)

上記サイトのままだとなぜか(バージョンの問題?)
TypeError: 'module' object is not callable
が出たので、名前空間を解決

実装

import json
import websocket
from datetime import datetime, timedelta
import dateutil.parser
from time import sleep
from logging import getLogger,INFO,StreamHandler
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(INFO)
logger.setLevel(INFO)
logger.addHandler(handler)

def get_timestamp(d):
  timestamp = d["timestamp"].replace('T', ' ')[:-1]
  return  dateutil.parser.parse(timestamp) + timedelta(hours=9)

"""
This program calls Bitflyer real time API JSON-RPC2.0 over Websocket
"""
class RealtimeAPI(object):
  def __init__(self, url, channel):
    self.url = url
    self.channel = channel
    self.ohlc = {}
    self.ohlc["date"] = ""

    #Define Websocket
    self.ws = websocket.WebSocketApp(self.url, header=None, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
    websocket.enableTrace(True)

  def run(self):
    #ws has loop. To break this press ctrl + c to occur Keyboard Interruption Exception.
    self.ws.run_forever()
    logger.info('Web Socket process ended.')

  """
  Below are callback functions of websocket.
  """
  # when we get message
  def on_message(self, ws, message):
    output = json.loads(message)['params']
    now = datetime.now()
    ts = get_timestamp(output["message"])
    # 約定データが現在時刻より過去の場合は捨てる
    if(datetime(ts.year, ts.month, ts.day, ts.hour, ts.minute, ts.second) < datetime(now.year, now.month, now.day, now.hour, now.minute, now.second)):
      return

    price = float(output["message"]["ltp"])
    # OHLC データの時刻が更新された場合
    if(self.ohlc["date"] != ts.strftime("%Y-%m-%d %H:%M:%S")):
      if(self.ohlc["date"] != ""):
        logger.info("{}, {}, {}, {}, {}, {}".format(now, self.ohlc["date"], self.ohlc["open"], self.ohlc["high"], self.ohlc["low"], self.ohlc["close"]))
      self.ohlc["date"] = ts.strftime("%Y-%m-%d %H:%M:00")
      self.ohlc["open"] = price
      self.ohlc["high"] = price
      self.ohlc["low"] = price
      self.ohlc["close"] = price
    # OHLC データの時刻が同じ場合
    else:
      self.ohlc["high"] = max(self.ohlc["high"], price)
      self.ohlc["low"] = min(self.ohlc["low"], price)
      self.ohlc["close"] = price

  # when error occurs
  def on_error(self, ws, error):
    logger.error(error)

  # when websocket closed.
  def on_close(self, ws):
    logger.info('disconnected streaming server')

  # when websocket opened.
  def on_open(self, ws):
    logger.info('connected streaming server')
    output_json = json.dumps(
      {'method' : 'subscribe',
      'params' : {'channel' : self.channel}
      }
    )
    ws.send(output_json)

if __name__ == '__main__':
  #API endpoint
  url = 'wss://ws.lightstream.bitflyer.com/json-rpc'
  channel = 'lightning_ticker_FX_BTC_JPY' # Ticker
  json_rpc = RealtimeAPI(url=url, channel=channel)
  #ctrl + cで終了
  json_rpc.run()

9
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
12