- '20.10.14 ライブラリをアップデートし、より簡易な方法で受けられるようになった。
→ kabuステーション®API - PUSH API用にPythonラッパーをアップデートした
概要
前回に引き続き、auカブコム証券が個人に提供するkabuステーションAPIをPythonから利用する。
今回は、銘柄のWebsocketによる配信をPythonで受ける。また同時に、銘柄の登録、登録解除、登録全解除のコードも紹介する。
環境
- Windows 10
- Python 3.8.5 ( Microsoft Store からインストール )
追加パッケージ
- websockets
- requests
- pyyaml
コード
銘柄登録
import json
import requests
import yaml
# ---
def get_token():
with open('auth.yaml', 'r') as yml:
auth = yaml.safe_load(yml)
url = 'http://localhost:18080/kabusapi/token'
headers = {'content-type': 'application/json'}
payload = json.dumps(
{'APIPassword': auth['PASS'],}
).encode('utf8')
response = requests.post(url, data=payload, headers=headers)
return json.loads(response.text)['Token']
# ---
token = get_token()
EXCHANGES = {
1: '東証',
3: '名証',
5: '福証',
6: '札証',
}
payload = json.dumps({
'Symbols': [
{'Symbol': 8306 ,'Exchange': 1}, # MUFG
{'Symbol': 9433 ,'Exchange': 1}, # KDDI
# ... 50件まで登録可
],}).encode('utf8')
url = 'http://localhost:18080/kabusapi/register'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, payload, headers=headers)
regist_list = json.loads(response.text)
print('配信登録銘柄')
for regist in regist_list['RegistList']:
print("{} {}".format(
regist['Symbol'],
EXCHANGES[regist['Exchange']]))
銘柄登録解除
URLの変更のみで対応する。表示部は流用可能である。
url = 'http://localhost:18080/kabusapi/unregister'
全登録銘柄解除
payload
が不要となる。
url = 'http://localhost:18080/kabusapi/unregister/all'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, headers=headers)
配信表示
登録した銘柄の配信を受け続ける。Ctrl+Cで終了する。
json.loads
する response
に .text
がつかないので注意のこと。
import asyncio
import json
import websockets
# ---
async def stream():
uri = 'ws://localhost:18080/kabusapi/websocket'
async with websockets.connect(uri, ping_timeout=None) as ws:
while not ws.closed:
response = await ws.recv()
board = json.loads(response)
print("{} {} {}".format(
board['Symbol'],
board['SymbolName'],
board['CurrentPrice'],
))
loop = asyncio.get_event_loop()
loop.create_task(stream())
try:
loop.run_forever()
except KeyboardInterrupt:
exit()
なお、サーバ側がハートビート未実装のため websockets.connect
に ping_timeout=None
の引数が必要である。
【要望】WebSocket のping/pong対応
Issue#8 https://github.com/kabucom/kabusapi/issues/8
関連記事
次回:kabuステーション®API - REST API用のPythonラッパーをつくってみた
前回:kabuステーション®APIをPythonから使う
番外編:localhostのみアクセスできるサービスをnginxで中継する