BTC取引所のwebsocket経由の約定履歴取得コードです。
この記事はGithubで公開しているコードの一部を切り出して、読みやすく書き換えたものです。
on_messageの中身を書き換えて使ってください。
一応、unix時間(秒)、価格、枚数を表示 するコードを書いています。
sizeはテイカ−が買なら正、売りなら負です。
このクラスはwebsocket専用のスレッドを作ってデーモンとして実行します。
デーモンとして実行されたスレッドはデーモン以外のスレッドがすべてなくなると自動的に終了します。
なので動作確認用のコードではinput()で入力待ちにすることでメインスレッドを終了させずに待機させています。(enterもしくはctrl+C等で終了)
base.py
import websocket
import threading
import datetime
def unix_time_from_ISO8601Z(date):
td = datetime.datetime.strptime(date[:19], '%Y-%m-%dT%H:%M:%S')
td = td.replace(tzinfo=datetime.timezone.utc)
ts = td.timestamp()
ts += float('0.' + date[20:-1])
return ts
class Trade:
def __init__(self, url):
self.url = url
self.thread = threading.Thread(target=lambda: self.run())
self.thread.daemon = True
self.thread.start()
def run(self):
while True:
# websocket.enableTrace(True)
self.ws = websocket.WebSocketApp(
self.url,
on_open=lambda ws: self.on_open(ws),
on_close=lambda ws: self.on_close(ws),
on_message=lambda ws, msg: self.on_message(ws, msg),
on_error=lambda ws, err: self.on_error(ws, err))
self.ws.run_forever()
def on_open(self, ws):
pass
def on_close(self, ws):
pass
def on_message(self, ws, msg):
pass
def on_error(self, ws, err):
pass
bitflyer.py
import json
from base import Trade, unix_time_from_ISO8601Z
class BitflyerTrade(Trade):
ENDPOINT = 'wss://ws.lightstream.bitflyer.com/json-rpc'
SYMBOL = 'FX_BTC_JPY'
def __init__(self):
super().__init__(self.ENDPOINT)
self.ch = f'lightning_executions_{self.SYMBOL}'
def on_open(self, ws):
ws.send(json.dumps({
'method': 'subscribe',
'params': {'channel': self.ch},
}))
def on_message(self, ws, msg):
msg = json.loads(msg)
if msg['method'] != 'channelMessage':
return
p = msg['params']
ch = p['channel']
m = p['message']
for t in m:
ts = unix_time_from_ISO8601Z(t['exec_date'])
price = t['price']
size = t['size']
side = t['side']
if side == 'SELL':
size *= -1
# buy_id = t['buy_child_order_acceptance_id']
# sell_id = t['sell_child_order_acceptance_id']
if ch == self.ch:
print(ts, price, size)
if __name__ == '__main__':
trade = BitflyerTrade()
input()
bitfinex.py
import json
from base import Trade
class BitfinexTrade(Trade):
ENDPOINT = 'wss://api.bitfinex.com/ws/2'
SYMBOL = 'tBTCUSD'
def __init__(self):
super().__init__(self.ENDPOINT)
def on_open(self, ws):
ws.send(json.dumps({
'event': 'subscribe',
'channel': 'trades',
'symbol': self.SYMBOL,
}))
def on_message(self, ws, msg):
msg = json.loads(msg)
if type(msg) == list and len(msg) == 3 and msg[1] == 'te':
t = msg[2]
ts = t[1]/1000
price = t[3]
size = t[2]
print(ts, price, size)
if __name__ == '__main__':
trade = BitfinexTrade()
input()
bitmex.py
import json
from base import Trade, unix_time_from_ISO8601Z
class BitmexTrade(Trade):
ENDPOINT = 'wss://www.bitmex.com/realtime'
SYMBOL = 'XBTUSD'
def __init__(self):
super().__init__(f'{self.ENDPOINT}?subscribe=trade:{self.SYMBOL}')
def on_message(self, ws, msg):
msg = json.loads(msg)
if 'table' in msg and msg['table'] == 'trade' and \
msg['action'] == 'insert':
ts = unix_time_from_ISO8601Z(msg['data'][0]['timestamp'])
for t in msg['data']:
price = t['price']
size = round(t['size'] / price, 8)
if t['side'] == 'Sell':
size *= -1
print(ts, price, size)
if __name__ == '__main__':
trade = BitmexTrade()
input()
binance.py
import json
from base import Trade, unix_time_from_ISO8601Z
class BinanceTrade(Trade):
ENDPOINT = 'wss://stream.binance.com:9443/ws'
SYMBOL = 'btcusdt'
def __init__(self):
super().__init__(f'{self.ENDPOINT}/{self.SYMBOL}@trade')
def on_message(self, ws, msg):
msg = json.loads(msg)
if msg['e'] == 'trade' and msg['M']:
ts = msg['T']/1000
price = float(msg['p'])
size = float(msg['q'])
if msg['m']:
size *= -1
print(ts, price, size)
if __name__ == '__main__':
trade = BinanceTrade()
input()
実行方法
base.pyと取引所毎のファイルを同じディレクトリに置いて実行してください。
$ ls
binance.py bitfinex.py bitflyer.py bitmex.py exchange_base.py
$ python3 bitflyer.py
更新履歴
・2019/05/29 スレッドをデーモンに変更。
・2019/07/14 スレッドの実行方法を変更。
・2019/07/16 Liquid追加。
・2019/09/01 一部ファイル名、クラス名を変更。
・2019/09/03 python3.7依存のコードを除去。関連する注意書きを削除。
・2019/09/03 命名規則の統一。
・2019/10/27 bitfinexが抜けていたので修正。
・2020/02/22 記事をnoteからqiitaに移動。