2018/5/15 掲載コードの余分な部分を消去しました
2018/8/19 パッケージのバージョンにより正常に動作しないケースについて追記しました。
はじめに
Bitflyerとは仮想通貨取引所の一つです。
現在 Bitflyerは、APIとして大きく分けてHTTP APIとRealtime APIの二種類を公開しています。
これにより、外部からのプログラミングによる仮想通貨取引情報の取得、仮想通貨の取引を可能にしています。
HTTP APIは板情報の取得から、注文まで幅広い機能をカバーしています。
しかし、機能を使う際には、その都度呼び出す必要があり、呼び出し回数には制限があります。
サーバーの負荷も考慮して呼び出す必要があります。
一方、Realtime APIの機能は絞られているものの、
その都度APIを呼び出さず、情報を都度受信できるようになっています。
サーバーへの負荷も考えられている仕様でもあるため、
「板情報・Ticker・約定情報のリアルタイム受信」には、HTTP APIより適しています。
特に板情報の差分を受信すると、そのリアルタイム性はLightning取引所ページの板情報並みであり、
感動します。
この記事は、Realtime APIの仕様の一つである、
JSON-RPC 2.0 over WebSocketのAPI呼び出しをPythonで行ってみました。
投稿時、日本語ではRealtime APIについてのPythonの記事は少なく、
あっても廃止予定のPubNub仕様での記事が多少見られる程度であったため、
普及も兼ねて投稿してみます。
公式ページのRubyサンプルプログラムを参考に
後に掲載するプログラムは、Bitflyer公式のサンプルプログラムを参考にしています。
これは、ビットコイン現物の板情報を定期的に受信するものです。
bitFlyer Lightning Realtime API Playground
require 'rubygems'
require 'websocket-client-simple'
require 'json'
channelName = "lightning_board_snapshot_BTC_JPY"
ws = WebSocket::Client::Simple.connect 'wss://ws.lightstream.bitflyer.com/json-rpc'
ws.on :open do
puts "open"
json = JSON.generate({:method => :subscribe, :params => { :channel => channelName }, :id => nil})
ws.send(json)
end
ws.on :message do |msg|
data = JSON.parse(msg.data)
if data["method"] == "channelMessage" then
p data["params"]
end
end
ws.on :close do |e|
p e
exit 1
end
ws.on :error do |e|
p e
end
print "Please press key to exit."
STDIN.getc
サンプルプログラムはシンプルにできていることがわかります。
Websocketで接続を行い、コールバック(open,message,close,error)を受けた場合、各所で処理しているのみです。
PythonではWebsocketのライブラリがありますので、同様の構造で移植をすることができます。
作成したコード
同様に、ビットコイン現物の板情報を定期的に受信するものです。
import json
import websocket
from time import sleep
from logging import getLogger,INFO,StreamHandler
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(INFO)
logger.setLevel(INFO)
logger.addHandler(handler)
"""
This program calls Bitflyer real time API JSON-RPC2.0 over Websocket
"""
class RealtimeAPI(object):
def __init__(self, url, channel):
self.url = url
self.channel = channel
#Define Websocket
self.ws = websocket.WebSocketApp(self.url,header=None,on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close)
websocket.enableTrace(True)
def run(self):
#ws has loop. To break this press ctrl + c to occur Keyboard Interruption Exception.
self.ws.run_forever()
logger.info('Web Socket process ended.')
"""
Below are callback functions of websocket.
"""
# when we get message
def on_message(self, ws, message):
output = json.loads(message)['params']
logger.info(output)
# when error occurs
def on_error(self, ws, error):
logger.error(error)
# when websocket closed.
def on_close(self, ws):
logger.info('disconnected streaming server')
# when websocket opened.
def on_open(self, ws):
logger.info('connected streaming server')
output_json = json.dumps(
{'method' : 'subscribe',
'params' : {'channel' : self.channel}
}
)
ws.send(output_json)
if __name__ == '__main__':
#API endpoint
url = 'wss://ws.lightstream.bitflyer.com/json-rpc'
channel = 'lightning_board_snapshot_BTC_JPY'
json_rpc = RealtimeAPI(url=url, channel=channel)
#ctrl + cで終了
json_rpc.run()
コードはGitHubでも公開しています。
必須ライブラリ
Websocket clientライブラリをpipでインストールしておきます。
(2018/08/19 ライブラリ側のバグのためか最新バージョンの0.50.0では正常に動作しないので、0.47.0を使用します)
pip install websocket-client=0.47.0
基本動作
JSON-RPC 2.0 over WebSocketというAPIの規格名通り、
RealtimeAPIクラスインスタンス作成時にWebsocketでの接続を試みます。
接続後、on_open関数がコールバック関数として必ず呼ばれます。
その際に、購読するチャンネル(後述)をJSON形式で送ることで接続を確立します。(JSON-RPC部分)。
これにより、APIから情報が送られ、その都度on_messageが呼ばれます。
受信した情報はJSON形式のため、ここで処理をすることになります。
各関数が呼ばれた際は、メッセージを出力しこれをloggerが標準出力しています。
購読チャンネルの概要
詳細は公式のAPI仕様ページをご覧ください。
購読チャンネルとは、情報の種類(板情報・板情報差分・Ticker・約定情報)と
取り扱い商品(現物・FX・先物)によって定義される文字列です。
例えば、「現物の板情報」を取得したい場合のチャンネルは
lightning_board_snapshot_BTC_JPY
よりリアルタイム性に優れた「現物の板の差分情報」を取得したい場合は
lightning_board_BTC_JPY
となります。