Help us understand the problem. What is going on with this article?

kabuステーション®APIのWebsocketをPythonで受ける

  • '20.10.14 ライブラリをアップデートし、より簡易な方法で受けられるようになった。

kabuステーション®API - PUSH API用にPythonラッパーをアップデートした

概要

前回に引き続き、auカブコム証券が個人に提供するkabuステーションAPIをPythonから利用する。
今回は、銘柄のWebsocketによる配信をPythonで受ける。また同時に、銘柄の登録、登録解除、登録全解除のコードも紹介する。

環境

  • Windows 10
  • Python 3.8.5 ( Microsoft Store からインストール )

追加パッケージ

  • websockets
  • requests
  • pyyaml

コード

銘柄登録

import json
import requests
import yaml

# ---

def get_token():
    with open('auth.yaml', 'r') as yml:
        auth = yaml.safe_load(yml)

    url = 'http://localhost:18080/kabusapi/token'
    headers = {'content-type': 'application/json'}
    payload = json.dumps(
        {'APIPassword': auth['PASS'],}
        ).encode('utf8')

    response = requests.post(url, data=payload, headers=headers)

    return json.loads(response.text)['Token']

# ---

token = get_token()

EXCHANGES = {
    1: '東証',
    3: '名証',
    5: '福証',
    6: '札証',
}

payload = json.dumps({
    'Symbols': [
        {'Symbol': 8306 ,'Exchange': 1},  # MUFG
        {'Symbol': 9433 ,'Exchange': 1},  # KDDI
        # ... 50件まで登録可
    ],}).encode('utf8')

url = 'http://localhost:18080/kabusapi/register'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, payload, headers=headers)

regist_list = json.loads(response.text)

print('配信登録銘柄')
for regist in regist_list['RegistList']:
    print("{} {}".format(
        regist['Symbol'],
        EXCHANGES[regist['Exchange']]))

銘柄登録解除

URLの変更のみで対応する。表示部は流用可能である。

url = 'http://localhost:18080/kabusapi/unregister'

全登録銘柄解除

payload が不要となる。

url = 'http://localhost:18080/kabusapi/unregister/all'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, headers=headers)

配信表示

登録した銘柄の配信を受け続ける。Ctrl+Cで終了する。
json.loads する response.text がつかないので注意のこと。

import asyncio
import json
import websockets

# ---

async def stream():
    uri = 'ws://localhost:18080/kabusapi/websocket'

    async with websockets.connect(uri, ping_timeout=None) as ws:
        while not ws.closed:
            response = await ws.recv()
            board = json.loads(response)
            print("{} {} {}".format(
                board['Symbol'],
                board['SymbolName'],
                board['CurrentPrice'],
            ))

loop = asyncio.get_event_loop()
loop.create_task(stream())
try:
    loop.run_forever()
except KeyboardInterrupt:
    exit()

なお、サーバ側がハートビート未実装のため websockets.connectping_timeout=None の引数が必要である。

【要望】WebSocket のping/pong対応
  Issue#8 https://github.com/kabucom/kabusapi/issues/8

shirasublue
うなぎのぼりを夢見て
https://shirasu.blue
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away