16
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

kabuステーション®APIのWebsocketをPythonで受ける

Last updated at Posted at 2020-09-08
  • '20.10.14 ライブラリをアップデートし、より簡易な方法で受けられるようになった。

kabuステーション®API - PUSH API用にPythonラッパーをアップデートした

概要

前回に引き続き、auカブコム証券が個人に提供するkabuステーションAPIをPythonから利用する。
今回は、銘柄のWebsocketによる配信をPythonで受ける。また同時に、銘柄の登録、登録解除、登録全解除のコードも紹介する。

環境

  • Windows 10
  • Python 3.8.5 ( Microsoft Store からインストール )

追加パッケージ

  • websockets
  • requests
  • pyyaml

コード

銘柄登録

import json
import requests
import yaml

# ---

def get_token():
    with open('auth.yaml', 'r') as yml:
        auth = yaml.safe_load(yml)

    url = 'http://localhost:18080/kabusapi/token'
    headers = {'content-type': 'application/json'}
    payload = json.dumps(
        {'APIPassword': auth['PASS'],}
        ).encode('utf8')

    response = requests.post(url, data=payload, headers=headers)

    return json.loads(response.text)['Token']

# ---

token = get_token()

EXCHANGES = {
    1: '東証',
    3: '名証',
    5: '福証',
    6: '札証',
}

payload = json.dumps({
    'Symbols': [
        {'Symbol': 8306 ,'Exchange': 1},  # MUFG
        {'Symbol': 9433 ,'Exchange': 1},  # KDDI
        # ... 50件まで登録可
    ],}).encode('utf8')

url = 'http://localhost:18080/kabusapi/register'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, payload, headers=headers)

regist_list = json.loads(response.text)

print('配信登録銘柄')
for regist in regist_list['RegistList']:
    print("{} {}".format(
        regist['Symbol'],
        EXCHANGES[regist['Exchange']]))

銘柄登録解除

URLの変更のみで対応する。表示部は流用可能である。

url = 'http://localhost:18080/kabusapi/unregister'

全登録銘柄解除

payload が不要となる。

url = 'http://localhost:18080/kabusapi/unregister/all'
headers = {'Content-Type': 'application/json', 'X-API-KEY': token,}
response = requests.put(url, headers=headers)

配信表示

登録した銘柄の配信を受け続ける。Ctrl+Cで終了する。
json.loads する response.text がつかないので注意のこと。

import asyncio
import json
import websockets

# ---

async def stream():
    uri = 'ws://localhost:18080/kabusapi/websocket'

    async with websockets.connect(uri, ping_timeout=None) as ws:
        while not ws.closed:
            response = await ws.recv()
            board = json.loads(response)
            print("{} {} {}".format(
                board['Symbol'],
                board['SymbolName'],
                board['CurrentPrice'],
            ))

loop = asyncio.get_event_loop()
loop.create_task(stream())
try:
    loop.run_forever()
except KeyboardInterrupt:
    exit()

なお、サーバ側がハートビート未実装のため websockets.connectping_timeout=None の引数が必要である。

【要望】WebSocket のping/pong対応
  Issue#8 https://github.com/kabucom/kabusapi/issues/8

関連記事

次回:kabuステーション®API - REST API用のPythonラッパーをつくってみた
前回:kabuステーション®APIをPythonから使う
番外編:localhostのみアクセスできるサービスをnginxで中継する

16
18
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?