初めに
pybottersに馴染みのない方は先にまちゅけんさんの記事や本家githubを一読するのをお勧めします! 参考一覧
2022-12-04時点でpybotters-wrapperはα版であり仕様は予告なく変更される可能性があります。本記事の内容はv0.1.0-alpha時点のcommitを元に書いています。
pybottersのラッパーを書きました!
本ラッパーの究極的な目標はpybottersを用いた非同期bot開発の参入障壁を下げることです。
pybottersの各種機能及びそのベースとなっているasyncioの非同期プログラムを使いやすい形にラップして提供することで、pybottersコミュニティの活性化に貢献したいと思っています(pybotters・asyncio愛用者同士でワイワイ非同期botを作りたい)。
早速ですが、詳しい中身の解説の前に、まずはpybotters-wrapperを用いた秒速スイングbotのサンプル実装を紹介します:
import asyncio
from argparse import ArgumentParser
import pandas_ta  # noqa
import pybotters_wrapper as pbw
def get_feature(df, length=14, th=20):
    """rsi計算用関数"""
    rsi = df.ta.rsi(length=length).values[-1]
    if rsi > (100 - th):
        return -1, rsi
    elif rsi < th:
        return 1, rsi
    else:
        return 0, rsi
async def main(args):
    # loggerの設定
    logdir = pbw.utils.init_logdir(
        args.exchange,
        args.symbol,
        f"bar-{args.bar_seconds}_rsi-{args.rsi_th}-{args.rsi_length}",
    )
    logger = pbw.utils.init_logger(f"{logdir}/log.txt")
    async with pbw.create_client(apis=args.api) as client:
        # ストアの設定
        store = pbw.create_store(args.exchange)
        # APIの設定 (verbose=Trueでリクエストごとにログ出力する)
        api = pbw.create_api(args.exchange, client, verbose=True)
        # timebarの設定
        tbar = pbw.plugins.timebar(store, seconds=args.bar_seconds)
        # 更新のたびに最新のDataFrameを取得するためのqueueを発行
        df_queue = tbar.register_queue()
        # 約定履歴とBarの書き出しを設定
        (
            pbw.plugins.watch_csvwriter(
                store, "execution", f"{logdir}/execution.csv", per_day=True, flush=True
            ),
            pbw.plugins.bar_csvwriter(
                tbar, f"{logdir}/bar.csv", per_day=True, flush=True
            ),
        )
        await store.subscribe("all", symbol=args.symbol).connect(
            client, auto_reconnect=True, waits=["trades"]
        )
        while True:
            # barの更新があるまで待機
            df = await df_queue.get()
            # rsiを計算
            trend, rsi = get_feature(df, args.rsi_length, args.rsi_th)
            logger.info(f"rsi={rsi} trend={trend} position={store.position.summary()}")
            if trend == 1:
                if store.position.size("BUY") == 0:
                    # long entry (short玉を持っていれば決済)
                    size = args.size + store.position.size("SELL")
                    resp = await api.market_order(args.symbol, "BUY", size)
                    logger.info(f"entry buy: {resp.order_id}")
            elif trend == -1:
                if store.position.size("SELL") == 0:
                    # short entry (long玉を持っていれば決済)
                    size = args.size + store.position.size("BUY")
                    resp = await api.market_order(args.symbol, "SELL", size)
                    logger.info(f"entry sell: {resp.order_id}")
if __name__ == "__main__":
    parser = ArgumentParser()
    parser.add_argument("--api", required=True)
    parser.add_argument("--exchange", default="bitflyer")
    parser.add_argument("--symbol", default="FX_BTC_JPY")
    parser.add_argument("--size", default=0.01, type=float)
    parser.add_argument("--bar_seconds", default=5, type=int)
    parser.add_argument("--rsi_th", default=30, type=float)
    parser.add_argument("--rsi_length", default=14, type=int)
    args = parser.parse_args()
    try:
        asyncio.run(main(args))
    except KeyboardInterrupt:
        pass
pybottersが低レベル機能(ソケット接続・API認証・DataStoreのCRUD)の提供に努めることでシンプルさと高いカスタマイズ性を実現しているのに対して、pybotters-wrapperは高機能API(pbw.create_*やpbw.plugins.*など)を提供することでユーザーの実装コスト削減に注力しています。以下こちらのサンプルで使用しているpybotters-wrapperのAPIの概略です(使用順)。
- 
pbw.utils.init_logdir(...): ログ用のディレクトリを作成
- 
pbw.utils.init_logger(...): loguruベースのloggerを初期化(デフォルトは10MBでファイル更新、3世代分のログを残す)
- 
pbw.create_client(...):pybotters.Clientを作成する
- 
pbw.create_store(...):DataStoreWrapper(後述)を作成する
- 
store.subscribe(...): 購読チャンネルの登録
- 
store.connect(...): ウェブソケットの接続(接続待機+再接続)
- 
pbw.plugins.timebar(...): バーの作成(足はウェブソケットで配信される約定履歴から随時作成される)
- 
tbar.register_queue(...): バー更新時に最新のDataFrameが配信されるqueueを取得
- 
pbw.plugins.watch_csvwriter(...): DataStore.watchで流れてくるアイテムをcsvに書き出すプラグイン
- 
pbw.plugins.bar_csvwriter(...): バーをcsvに書き出すプラグイン
- 
store.position.size(...): ポジション保有量を取得
- 
api.market_order(...):成行注文
自前で書こうとするとなかなかの量になりそうですが、その多くがpybotters-wrapperの中に実装されています(内部で余計なことを勝手に色々やっています)。また、察しの良い方はお気づきかもしれませんが、storeやapi 周りは取引所間の差分吸収がなされます。
インストール方法は
# この記事執筆時点のもの
pip install git+https://github.com/ko0hi/pybotters-wrapper@v0.1.0-alpha
# 最新
pip install git+https://github.com/ko0hi/pybotters-wrapper
です。
レポジトリには他にもサンプルが上がっているのでそちらも参照いただけたら幸いです。
(※いずれのサンプルも売買ロジックではなくコードの共有が目的となっている点にご注意ください)
スイングボット(上のもの)
マーケットメイキングボット
ブレイクアウトボット
髭取りボット
複数取引所監視ボット
その他スニペットサンプルたち
pybotters-wrapper
ここからはpybotters-wrapperの詳細を記していきます。
目標
究極的な目標はpybottersを用いた非同期bot開発の参入障壁を下げることであり、そのアプローチとしてpybotters及びasyncioのラッパーの提供を採用しています。
その上でどのようなラッパーがあればpybotters/asyncioベースのbot開発に取り組みやすくなるのか?コミュニティでの開発が盛り上がるのか? を考えた結果、以下の2点に焦点を当てることにしました。
(1)pybotters上での取引所間の差分を吸収する
(2)拡張用の口(plugin)を提供する
一つ目の「pybotters上での取引所間の差分」というのは主にDataStoreのことを指しています。現状pybottersの仕様としてDataStoreに関して取引所間の差分吸収は行わず、ユーザーは取引所別の仕様を理解する必要があり、実装も取引所別に行う必要があります。これに対して、約定履歴・注文履歴・ポジション状態などに共通のインターフェースを被せることができれば、その上の処理(初期化・ソケット接続など)やモジュール(Bar構築・在庫処理・ロギングなど)を統一化することができます。統一化ができるようになると流用・共有が容易になるので、それがコミュニティ単位での開発に繋がってほしいと考えました。
二つ目は(1)の上に成り立つ拡張機能、いわゆるプラグインです。(1)で述べた”コミュニティ単位での開発”を促進する手段として、プラグイン用のインターフェース・utilityといった開発ツールを提供します。そして「pybottersには色々な人が作った様々な便利プラグインがあるから使ってみよう」「俺の最強機能をpybottersコミュニティに提供したい!」となって、pybottersコミュニティとして非同期bot開発が盛り上がるのを目論んでいます。
以降ではこれらを達成するための各種機能を紹介していきます。
3つの機能紹介
ここでは大きく3つの機能を紹介します:
① データストアの共通化:取引所間で同じデータフォーマットを持とう!
② 注文APIの共通化:取引所間で同じ注文パラメーターを持とう!
③ プラグインによる拡張性:hookを用いて拡張性を持とう!
①データストアの共通化
pybotters-warpperにおいてデータストアの共通化を担うのがDataStoreWrapperと NormalizedDataStoreという二つのクラスです。
DataStoreWrapperはpybotters-wrapperの中核であり、pybotters.store.DataStoreManagerをラップする合成クラスです。1
NormalizedDataStoreはpybotters.store.DataStoreを継承したストアクラスであり、共通化の実処理を担うクラスです。
この2クラスで行う共通化は以下の3つです:
(1) データストア
(2) 初期化
(3) ソケット接続
(1)ストアの共通化が最も重要で(2)・(3)は(1)の実現によって自然と実装されます。
データストア
さて、データストアの共通化において最初に行う必要があったのが実装するストアの取捨選択でした。
pybotters-wrapperでは6つのストアを以下のように共通化することにしました。
| 名前 | 概要 | 要素 | 認証 | 
|---|---|---|---|
| Ticker | ティック | symbol・price | public | 
| Trades | 取引 | symbol・side・price・size・timestamp・id | public | 
| Orderbook | 板 | symbol・side・price・size | public | 
| Execution | 約定 | symbol・side・price・size・timestamp・id | private | 
| Order | 注文 | symbol・side・price・size・type・id | private | 
| Position | ポジション | symbol・side・price・size | private | 
各要素のタイプはこちらです。
symbol: str
side: str  # ("BUY" or "SELL")
price: float
size: float
timestamp: datetime
type: str  # ("LIMIT"・"STOP"など)
id: str
bitflyerのTrading Viewと対応づけるとこのような形になります。
この6つによってトレードに必要な基本的な情報のほとんどがカバーされていそうです
(最も目立つチャートを含めなかったのは、(1)バーをソケットで配信していない取引所がちらほらある、(2)チャート情報は取引履歴から作成できる、の二つの理由からです)。
さて、これら6つのストアの取引所間共通化実装を担うのがNormalizedDataStoreであると述べました。
触りのところを見てみます。NormalizedDataStoreはpybottersのDataStore(データを保持する簡易インメモリDBクラス)を継承した子クラスであり、かつ、DataStoreのインスタンスを内部に持つ合成クラスでもあります。そして、受け取ったDataStoreインスタンスのwatchやwaitを監視して、正規化したアイテムを自身に挿入していくのが仕事です。
class NormalizedDataStore(DataStore):
    _AVAILABLE_OPERATIONS = ("_insert", "_update", "_delete")
    def __init__(self, store: DataStore = None, auto_cast=False):
        super(NormalizedDataStore, self).__init__(auto_cast=auto_cast)
        self._store = store
        if self._store is not None:
            self._wait_task = asyncio.create_task(self._wait_store())
            self._watch_task = asyncio.create_task(self._watch_store())
        else:
            self._wait_task = None
            self._watch_task = None
    ...
    async def _wait_store(self):
        while True:
            await self._store.wait()
            # この中で正規化を行う!
            self._on_wait()
    async def _watch_store(self):
        with self._store.watch() as stream:
            async for change in stream:
                # この中で正規化を行う!
                self._on_watch(change)
NormalizedDataStoreをベースにして先ほど定めた6つのデータストア用のベースクラスが定義されています:
- TickerDataStore
- TradesDataStore
- OrderbookDataStore
- ExecutionDataStore
- OrderDataStore
- PositionDataStore
各取引所のモジュールはこれらを継承して実装クラスを用意し、その中で差分吸収ロジックを書きます。
例としてTickerDataStoreを見てみます。
class TickerItem(TypedDict):
    symbol: str
    price: float
class TickerStore(NormalizedDataStore):
    _KEYS = ["symbol"]
    def _normalize(self, d: dict, op: str) -> "TickerItem":
        raise NotImplementedError
    def _itemize(self, symbol: str, price: float, **kwargs):
        return TickerItem(symbol=symbol, price=price, **kwargs)
ストアに格納される要素はTypedDictとして定義されています。
また_normalizeメソッドが実装必須になっていて、取引所ごとの実装クラスにおいてはこのメソッドをオーバーライドしてデータの変換処理を定義する必要があります。
いくつかの取引所のTickerStoreの実装を並べてみます:
class BinanceTickerStore(TickerStore):
    def _normalize(self, d: dict, op: str) -> "TickerItem":
        return self._itemize(d["s"], float(d["c"]))
class bitFlyerTickerStore(TickerStore):
    def _normalize(self, d: dict, op: str) -> "TickerItem":
        return self._itemize(d["product_code"], d["ltp"])
class BybitTickerStore(TickerStore):
    def _normalize(self, d: dict, op: str) -> "TickerItem":
        return self._itemize(d["symbol"], float(d["last_price"]))
class CoincheckTickerStore(TickerStore):
    def _normalize(self, d: dict, op: str) -> "TickerItem":
        return self._itemize(d["pair"], float(d["rate"]))
実際に_normalizeの実装が呼ばれるのは、先ほどのNormalizedDataStore._on_watchの中です。
    def _on_watch(self, change: "StoreChange"):
        op = self._get_operation(change)
        if op is not None:
            normalized_data = self._normalize({**change.data}, change.operation)
            item = self._make_item(normalized_data, change)
            self._check_operation(op)
            op_fn = getattr(self, op)
            op_fn([item])
_normalizeの他にもいくつかオーバーライドできそうな関数(hook)が呼ばれていますね。pybotters-wrapperの差分吸収では、抽象クラスの中で処理フローをテンプレート化して、各取引所実装クラス用にhookを提供します。実装クラスはhookをオーバーライドして、ミニマムなコードで要素の正規化を実現させます。hookをうまくオーバーライドしながら差分を吸収していくのがpybotters-wrapperの実装の醍醐味となります。後述するplugin機能も同じような仕組みで実装が行われています。
初期化
pybottersのデータストアはREST API経由で初期化を行う機能があります。pybotters-wrapperでもNormalizedDataStoreについては初期化用APIを提供しています。
exchange = "binanceusdsm"
symbol = "BTCUSDT"
async with pybotters.Client() as client:
    store = pbw.create_store(exchange)
    await store.initialize(
        [
            "token", 
            ("orderbook", {"symbol": symbol})
        ],
        client=client,
    )
初期化したい対象のエイリアス名と(必要であれば)対応するRESTエンドポイントのパラメターを渡してあげます。
上記のコードを素のpybottersで書くとこのようになります。
exchange = "binanceusdsm"
symbol = "BTCUSDT"
async with pybotters.Client() as client:
    store = pybotters.BinanceUSDSMDataStore()
    await store.initialize(
        client.post("https://fapi.binance.com/fapi/v1/listenKey"),
        client.get(f"https://fapi.binance.com/fapi/v1/depth?symbol={symbol}")
    )
行数的には変わりませんが、対応するエンドポイントを覚えておく必要がなくなっています。
initializeメソッドの第一引数はlist[Awaitable|str|tuple[str, dict]]です。
初期化がサポートされているエイリアス名は以下の8つです。
- token
- token_public
- token_private
- ticker
- trades
- orderbook
- execution
- order
- position
token・token_public・token_privateはBinance・GMOCoin・KuCoinで採用されているようなトークン形式のwebsocket接続のためのもので、その他は各種NormalizedDataStoreに対応します。
必要なパラメータを忘れた場合エラーが送出されます(Binanceのorderbookストア):
exchange = "binanceusdsm"
symbol = "BTCUSDT"
async with pybotters.Client() as client:
    store = pbw.create_store(exchange)
    await store.initialize(
        [
            "token", 
            "orderbook"  # symbolが必要
        ],
        client=client,
    )
# RuntimeError: Missing required parameters for initializing 'orderbook' of BinanceUSDSMDataStoreWrapper: ['symbol'] (HINT: store.initialize([('orderbook', {'symbol': '...'}), ...))
Awaitableを与えた場合の挙動はpybottersに準拠するので、NormalizedDataStore以外のストアも一緒に初期化できます。
exchange = "binanceusdsm"
symbol = "BTCUSDT"
async with pybotters.Client() as client:
    store = pbw.create_store(exchange)
    await store.initialize(
        [
            "token", 
            ("orderbook", {"symbol": symbol}),
            client.get(f"https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval=1m")
        ],
        client=client,
    )
正直なところ引数のタイプがぐちゃぐちゃで見栄えが非常に悪いのでこのAPIは仕様変更の可能性高いです。
ソケット接続
DataStoreWrapperはソケット接続のために二つのAPIを提供しています。
- store.subscribe
- store.connect
NormalizedDataStore用のチャンネルを購読するときは以下のようになります:
exchange = "bitflyer"
symbol = "FX_BTC_JPY"
async with pybotters.Client() as client:
    store = pbw.create_store(exchange)
    await store.subscribe(
        ["ticker", "trades", "orderbook", "execution", "order", "position"],
        symbol=symbol,  # 必要に応じてそれぞれのチャンネルの購読パラメーターに展開されます
    ).connect(client, auto_reconnect=True)
各ストア用のチャンネルはpybotters-wrapper側で持っていて、エイリアス名(と必要な引数)を渡すだけでよろしく購読してくれます。
また購読チャンネルのショートカットとして"all"・"public"・"private"が用意されています。
- 
"all"=["ticker", "trades", "orderbook", "execution", "order", "position"]
- 
"public"=["ticker", "trades", "orderbook"]
- 
"private"=["execution", "order", "position"]
exchange = "bitflyer"
symbol = "FX_BTC_JPY"
async with pybotters.Client() as client:
    store = pbw.create_store(exchange)
    await store.subscribe("all", symbol=symbol).connect(client, auto_reconnect=True)
NormalizedDataStoreに関してはstore.subscribe("all", symbol=...)とすればOKです。exchangeとsymbolの変更でどのストアでも同じように購読できます(サンプル)。
比較のために同じコードを素のpybottersのみで書いてみます。
async with pybotters.Client():
    symbol = "FX_BTC_JPY"
    store = pybotters.bitFlyerDataStore()
    await client.ws_connect(
        "wss://ws.lightstream.bitflyer.com/json-rpc",
        send_json=[
            {
                "method": "subscribe",
                "params": {"channel": f"lightning_ticker_{symbol}"},
                "id": 1,
            },
            {
                "method": "subscribe",
                "params": {"channel": f"lightning_executions_{symbol}"},
                "id": 2,
            },
            {
                "method": "subscribe",
                "params": {"channel": f"lightning_board_snapshot_{symbol}"},
                "id": 3,
            },
            {
                "method": "subscribe",
                "params": {"channel": f"lightning_board_{symbol}"},
                "id": 4,
            },
            {
                "method": "subscribe",
                "params": {"channel": "child_order_events"},
                "id": 5,
            },
        ],
        hdlr_json=store.onmessage,
    )
NormalizedDataStoreで使うもの以外のチャンネルを購読したい場合、connectに直接エンドポイント(endpoint)とリクエスト(send)を渡します。
exchange = "binanceusdsm"
symbol = "btcusdt"
async with pybotters.Client() as client:
    store = pbw.create_store(exchange)
    await store.subscribe("public", symbol=symbol).connect(
        client,
        endpoint="wss://fstream.binance.com/ws",
        send=[
            {"method": "SUBSCRIBE", "params": ["btcusdt@kline_1m"], "id": 100}
        ]
    )
    while True:
        print(store.store.kline.find())
        await asyncio.sleep(1)
これでsubscribeしたものに加えて(上の例だと"public" = ["ticker", "trads", "orderbook"])、klineチャンネルも購読されます。
ラップしているオリジナルのDataStoreManager(この場合BinanceUSDSMDataStore)にはstore.storeでアクセスできます。よってklineデータストアへのアクセスはstore.store.klineとなります。このように素のpybottersでできることはpybotters-wrapperでも(できるだけ)簡潔にできるように設計しています。
閑話休題:チャンネルの購読設定・ウェブソケットの接続機能の切り出し
subscribe・connectメソッドのメインロジックはWebsocketChannels・WebsocketConnectionクラスによって実行されています。そのためこれらのクラスを直接呼び出すことによって、「チャンネルの購読設定」・「ウェブソケットの接続」といった機能を切り出して使うことができます。
以下はbitflyerのウェブソケットチャンネルの購読設定だけ切り出して使う例です。
import asyncio
import pybotters
import pybotters_wrapper as pbw
async def main():
    symbol = "FX_BTC_JPY"
    async with pybotters.Client() as client:
        channels = pbw.bitflyer.bitFlyerWebsocketChannels()
        # チェインパターンでかけます
        # subscribesはdict[str, list[dict]]で、endpointと購読チャンネルのリストが入っています
        subscribes = (
            channels.lightning_board(symbol)
            .lightning_board_snapshot(symbol)
            .lightning_executions(symbol)
            .lightning_ticker(symbol)
            .child_order_events()
            .parent_order_events()
            .get()
        )
        for endpoint, send in subscribes.items():
            await client.ws_connect(endpoint, send_json=send)
        while True:
            await asyncio.sleep(1)
if __name__ == "__main__":
    asyncio.run(main())
そしてこちらがウェブソケットの接続部分だけ切り出して使う例です。
import asyncio
import pybotters
import pybotters_wrapper as pbw
async def main():
    symbol = "FX_BTC_JPY"
    
    async with pybotters.Client() as client:
        endpoint = "wss://ws.lightstream.bitflyer.com/json-rpc"
        send = [
            {
                "method": "subscribe",
                "params": {"channel": f"lightning_ticker_{symbol}"},
                "id": 1,
            },
            {
                "method": "subscribe",
                "params": {"channel": f"lightning_executions_{symbol}"},
                "id": 2,
            },
            {
                "method": "subscribe",
                "params": {"channel": f"lightning_board_snapshot_{symbol}"},
                "id": 3,
            },
            {
                "method": "subscribe",
                "params": {"channel": f"lightning_board_{symbol}"},
                "id": 4,
            },
            {
                "method": "subscribe",
                "params": {"channel": "child_order_events"},
                "id": 5,
            },
        ]
        hdlr = lambda msg, ws: print(msg)
        connection = pbw.core.WebsocketConnection(endpoint, send, hdlr)
        await connection.connect(client, auto_reconnect=True)
        while True:
            await asyncio.sleep(1)
if __name__ == "__main__":
    asyncio.run(main())
pybotters-wrapperの各モジュールは可能な限り疎結合となるよう作られており、ユーザーは必要な機能だけ切り出してpybottersと組み合わせて使用できます。
②注文APIの共通化
注文API対応済みの取引所はbinancespot・binanceusdsm・binancecoinm・bitflyer・bybitusdt・bybitinverse・gmocoin・kucoinspot・kucoinfuturesのみです。他の取引所への拡張が進みにつれて仕様が変更になる可能性があります。
pybotter-wrapperではRESTリクエスト用モジュールとしてAPIクラスを持ち、 pbw.create_api(exchange, client)で作成できます。
APIクラスが持つ機能は以下の通りです。
① 成行注文・指値注文・キャンセル注文の取引所間差分吸収
② エンドポイントに応じたbase urlの自動付与・注文桁数の自動調整といった注文パラメータの整形
③ 同期リクエストのサポート(pybottersの認証部分だけ借用してasyncio・aiohttpを通さない)
①がメインの機能であり、下記のインターフェースで提供されます:
# インターフェースの見本用定義で実際の実装ではありません
def market_order(symbol: str, side: str, size: float) -> OrderResponse:
    ...
def limit_order(symbol: str, side: str, price: float, size: float) -> OrderResponse:
    ...
def cancel_order(symbol: str, order_id: str) -> OrderResponse:
    ...
返り値のOrderResponseはNamedTupleです。
class OrderResponse(NamedTuple):
    order_id: str
    resp: aiohttp.ClientResponse
    resp_data: any = None
逆にいうとこちらのインターフェースで対応できるエンドポイントがない取引所は注文API共通化の対象外とします。2
共通化の実装を軽く覗いてみます。以下はAPIクラスのlimit_orderの処理フロー部分で、 _make_limit_endpoint・_make_limit_order_paramteterといったhookで構成されています。例に倣って各取引所のAPI実装クラスはこれらのhookを適切にオーバーライドすることで差分を吸収する仕様です。
async def limit_order(
        self,
        symbol: str,
        side: str,
        price: float,
        size: float,
        request_params: dict = None,
        order_id_key: str = None,
        **kwargs,
) -> "OrderResponse":
    request_params = request_params or {}
    endpoint = self._make_limit_endpoint(symbol, side, price, size, **kwargs)
    p = self._make_limit_order_parameter(endpoint, symbol, side, price, size)
    p_w_kwargs = self._add_kwargs_to_data(p, **kwargs)
    self.log(f"limit order request: {p_w_kwargs}", verbose=self._verbose)
    resp, resp_data = await self._make_limit_request(
        endpoint, p_w_kwargs, **request_params
    )
    self.log(f"limit order response: {resp} {resp_data}", verbose=self._verbose)
    order_id = self._make_limit_order_id(resp, resp_data, p, order_id_key)
    wrapped_resp = self._make_limit_order_response(resp, resp_data, order_id)
    return wrapped_resp
以下がBinanceの実装の抜粋です。このように取引所のRESTの仕様に合わせて各パラメータを加工しています。
class BinanceAPIBase(API):
    ...
    def _make_limit_order_parameter(
            self,
            endpoint: str,
            symbol: str,
            side: str,
            price: float,
            size: float,
    ) -> dict:
        return {
            "symbol": symbol.upper(),
            "side": side.upper(),
            "type": "LIMIT",
            "quantity": self.format_size(symbol, size),
            "price": self.format_price(symbol, price),
            "timeInForce": "GTC",
        }
インターフェースに存在しない取引所固有のパラメータを与えることも可能です。例えば以下はBinanceのSTOPオーダーを使用する例です。
api = pbw.create_api("binanceusdsm", client)
await api.limit_order(
    symbol,
    side,
    price,
    size,
    stopPrice=stop_price,
    type="STOP",
)
当該エンドポイントでサポートされている任意のパラメータをkwargs引数として渡すことができます。 ただしその場合、同じコードを異なる取引所で使いまわせなくなる点には注意してください。3
APIクラスはリクエストメソッドもラップしているので、そちらを用いればpybotters.Clientを直接使うのと同様に任意のエンドポイントにリクエストを送ることができます。
api = pbw.create_api("binanceusdsm", client)
# base_url("https://fapi.binance.com")は内部で自動付与されます
resp = await api.get("/fapi/v1/openOrders")
data = await resp.json()
さてここまで紹介した共通化によって、データ取得から発注というbotの要となる部分のインターフェースができました。
以下は
- ストアを初期化
- チャンネル購読
- Tickを取得
- 指値注文
- 注文を確認
- 注文をキャンセル
- キャンセルを確認
というシンプルな動作確認用サンプルコードです。単純な例ではありますが処理ロジックから取引所別の設定を排除することができました。
import asyncio
import pybotters_wrapper as pbw
async def main(exchange="bitflyer"):
    # 取引所個別の設定
    configs = {
        "binancespot": {"symbol": "BTCUSDT", "size": 0.001},
        "binanceusdsm": {"symbol": "BTCUSDT", "size": 0.001},
        "bitflyer": {"symbol": "FX_BTC_JPY", "size": 0.01},
        "bybitusdt": {"symbol": "BTCUSDT", "size": 0.001},
        "bybitinverse": {"symbol": "BTCUSD", "size": 1},
        "gmocoin": {"symbol": "BTC_JPY", "size": 0.01},
        "kucoinspot": {"symbol": "BTC-USDT", "size": 0.001},
        "kucoinfutures": {"symbol": "XBTUSDTM", "size": 1.1},
    }
    assert exchange in configs
    # 取引所独立のロジック
    async with pbw.create_client() as client:
        symbol = configs[exchange]["symbol"]
        size = configs[exchange]["size"]
        api = pbw.create_api(exchange, client, verbose=True)
        store = pbw.create_store(exchange)
        if initialize_config := configs[exchange].get("initialize", None):
            await store.initialize(initialize_config, client=client)
        await store.subscribe(["ticker", "order"], symbol=symbol).connect(
            client, waits=["ticker"]
        )
        # ltpから10%下がったところに買い指値
        price = store.ticker.find()[0]["price"] * 0.9
        # 指値注文
        new_resp = await api.limit_order(symbol, "BUY", price, size)
        # 注文情報受信待機
        await asyncio.sleep(3)
        # 注文状態確認
        orders1 = store.order.find({"id": new_resp.order_id})
        # チェック
        assert len(orders1) == 1
        print(orders1)
        # 注文キャンセル
        await api.cancel_order(symbol, new_resp.order_id)
        # 注文情報受信待機
        await asyncio.sleep(5)
        orders2 = store.order.find({"id": new_resp.order_id})
        # チェック
        assert len(orders2) == 0
        print(orders2)
if __name__ == "__main__":
    asyncio.run(main())
閑話休題:同期リクエスト
pybottersはasyncioベースのためリクエスト用メソッドもasync関数になっています。そのためasync関数内でしか呼び出せません。 例えば、「__init__内でレストを叩いて初期値を与えたい」といったことはできないので、別途async関数を用意してあげる必要があります(__init__でhttpリクエストを投げるのが良いか悪いかは置いておく)。
class Hoge:
    def __init__(self, client: pybotters.Client):
        # エラー!
        resp = await client.get("...")
class Hoge:
    def __init__(self):
        self._to_be_initalized = None
    async def init(self, client: pybotters.Client):
        # OK!
        resp = await client.get("...")
pybotters.[request|get|post|put|delete]はsync関数(i.e., asyncがついてない)ですが、内部的にはasync関数を呼び出しているだけなので、別のasync loopの中で使おうとするとエラーが出ます(pybottersでbotを書いたらほぼ確実に別のasync loopの中で呼び出すことになります)。
class Hoge:
    def __init__(self):
        # RuntimeError: This event loop is already running
        resp = pybotters.get("...")
上は極端な例ですが、「同期関数の中でHTTPリクエストを投げたい、とはいえ、他の処理は同期だからasyncつけたくない」というケース4はそれなりにあると思っているので同期メソッドを用意しました。
中身としてはpybotters.auth.Authとaiohttp.ClientSessionの一部を借用して認証を行い、同期リクエスト用ライブラリのrequestsに渡す、という実装にしました。
class API:
    ....
    def srequest(
        self, method, url, *, params=None, data=None, **kwargs
    ) -> requests.Response:
        # TODO: 網羅的なテスト
        # aiohttp.ClientSession._requestをpybotters.Clientから呼び出した時の処理を抜き出している
        sess: aiohttp.ClientSession = self._client._session
        req = sess._request_class(
            method,
            sess._build_url(self._attach_base_url(url)),
            params=params,
            data=data,
            headers=sess._prepare_headers([]),
            session=sess,
            auth=pybotters.auth.Auth,
        )
        headers = {str(k): str(v) for (k, v) in dict(req.headers).items()}
        if isinstance(req.body, aiohttp.payload.BytesPayload):
            data = req.body._value
        else:
            data = req.body
        # paramsはurlに埋め込まれている
        return requests.request(
            method=req.method, url=str(req.url), data=data, headers=headers, **kwargs
        )
    def sget(self, url, *, params=None, data=None, **kwargs) -> requests.Response:
        return self.srequest("GET", url, params=params, data=data, **kwargs)
    def spost(self, url, *, params=None, data=None, **kwargs) -> requests.Response:
        return self.srequest("POST", url, params=params, data=data, **kwargs)
    def sput(self, url, *, params=None, data=None, **kwargs) -> requests.Response:
        return self.srequest("PUT", url, params=params, data=data, **kwargs)
    def sdelete(self, url, *, params=None, data=None, **kwargs) -> requests.Response:
        return self.srequest("DELETE", url, params=params, data=data, **kwargs)
 pybotters-wrapperのAPIクラスにはs[request|get|post|put|delete]というようにsをプレフィックスに持つメソッドが用意されています。まだ網羅的にテストしたわけではないので、使いながら整備していく予定です。
実際にpybotters-wrapperの中でも以下のような使われ方をしています。
class _KuCoinDataStoreWrapper(DataStoreWrapper[pybotters.KuCoinDataStore]):
    ...
    def _parse_connect_endpoint(self, endpoint: str, client: pybotters.Client) -> str:
        try:
            return self.endpoint
        except RuntimeError:
            import pybotters_wrapper as pbw
            api = pbw.create_api(self.exchange, client)
            url = self._INITIALIZE_CONFIG["token"][1]
            resp = api.spost(url)
            data = resp.json()
            self.store._endpoint = self.store._create_endpoint(data["data"])
            self.log("Websocket token got automatically initialized", "warning")
            return self.endpoint
KuCoinはソケット接続のためにstore.initializeでエンドポイントを初期化してあげる必要があるのですが、ユーザーが初期化を忘れた際に同期メソッドを使って取得・設定をしています。ここで_parse_connect_endpointは同期関数のhookであり、呼び出し元はDataStoreWrapper.connectです。仮にKuCoin実装のオーバーライドでasyncをつけると元の呼び出し側でasync関数か否かによる条件分岐が発生してしまうので、それを避けるためにも同期メソッドの使用で対応しています。
hookを多用するpybotters-wrapperでは実装サイドによってsyncだったり・asyncだったりする状況はできるだけ避けたいところなのでした。
③プラグインによる拡張性
ここからはpybotters-wrapperのもう一つのテーマである拡張性の提供についてです。pybotters_wrapper.pluginsパッケージが当該箇所であり、データの加工・ログ・特殊注文など DataStoreWrapper・APIインターフェースではカバーできない部分がこの中に実装されていく予定です。
pluginパッケージはその特性上インターフェースは緩やかに設定され、比較的自由な書き方が許される予定です。誰でも気軽にオレオレ最強プラグインPRを飛ばしてください!
最初のサンプルでも使っていた秒速足のプラグインを追っていきながら、pybotters-wrapperのプラグインの仕様について紹介したいと思います。
早速ですが初期化部分はこのようになっていました:
tbar = pbw.plugins.timebar(store, seconds=args.bar_seconds)
pbw.plugins.関数名はプラグインのファクトリーメソッドになります。pybotters-wrapperビルトインのプラグインは、全てファクトリーメソッドが定義してありそれを使って初期化する仕様です。
timebar関数の中身はこんな具合です。プラグインの実装クラス(TimeBarStreamDataFrame)を初期化して返します。 _maybe_override_by_exchangeはもし取引所固有の実装があればそちらをimportして使うようにするパッチです(サラッと流します)。
def timebar(
    store: DataStoreWrapper,
    *,
    seconds: int,
    maxlen: int = 9999,
    df: pd.DataFrame = None,
    callback: Callable[[pd.DataFrame], dict] = None,
    message_delay: int = 2,
) -> TimeBarStreamDataFrame:
    return _maybe_override_by_exchange(
        store,
        "timebar",
        default_cls=TimeBarStreamDataFrame,
        seconds=seconds,
        maxlen=maxlen,
        df=df,
        callback=callback,
        message_delay=message_delay,
    )
TimeBarStreamDataFrameはBarStreamDataFrameの子クラスであり、略している部分では取引履歴を使ってバーを逐次構築していく処理が書かれています。
ここで重要な箇所は
- 
BarStreamDataFrameは_on_watchメソッドでバーの更新ロジックを走らせている
- 親クラスは DataStorePlugin
- 親クラスにstore.trades(Trades用のNormalizedDataStore)を渡している
です。
class TimeBarStreamDataFrame(BarStreamDataFrame):
   ...
class BarStreamDataFrame(DataStorePlugin):
    def __init__(
            self,
            store: 'DataStoreWrapper',
            *,
            maxlen: int = 9999,
            df: pd.DataFrame = None,
            callback: Callable[[pd.DataFrame], any] = None,
    ):
        super(BarStreamDataFrame, self).__init__(store.trades)
        ...
    def _on_watch(self, d: dict, op: str):
        if op == "insert":
            if self._is_new_bar(d, op):
                self._next_bar(d)
            else:
                self._current_bar(d)
もうお気づきだと思いますが、_on_watchはDataStorePluginが提供するhookです。DataStorePluginの実装を覗いてみます。__init__でpybotters.store.DataStoreをメンバ変数として受け取りそのwait/watchループタスクをスケジューリングしています。そしてそれぞれのループの中にhookが埋め込んであります(is_aw_*・_run_hookは同期関数・非同期関数の条件分岐を行うためのヘルパーです)。
class DataStorePlugin(Plugin):
    def __init__(self, store: DataStore):
        self._store = store
        self._wait_task = asyncio.create_task(self._run_wait_task())
        self._watch_task = asyncio.create_task(self._run_watch_task())
        self._queues = []
    async def _run_wait_task(self):
        is_aw_on_before = _is_aw(self._on_wait_before)
        is_aw_on_wait = _is_aw(self._on_wait)
        is_aw_on_after = _is_aw(self._on_wait_after)
        is_aw_on_is_stop = _is_aw(self._on_wait_is_stop)
        while True:
            await self._store.wait()
            await _run_hook(is_aw_on_before, self._on_wait_before)
            await _run_hook(is_aw_on_wait, self._on_wait)
            await _run_hook(is_aw_on_after, self._on_wait_after)
            is_stop = await _run_hook(is_aw_on_is_stop, self._on_wait_is_stop)
            if is_stop:
                break
    async def _run_watch_task(self):
        is_aw_on_before = _is_aw(self._on_watch_before)
        is_aw_on_transform = _is_aw(self._on_watch_transform)
        is_aw_on_watch = _is_aw(self._on_watch)
        is_aw_on_after = _is_aw(self._on_watch_after)
        is_aw_on_is_stop = _is_aw(self._on_watch_is_stop)
        with self._store.watch() as stream:
            async for change in stream:
                await _run_hook(is_aw_on_before, self._on_watch_before, change)
                transformed = await _run_hook(
                    is_aw_on_transform,
                    self._on_watch_transform,
                    {**change.data},
                    change.operation,
                )
                await _run_hook(
                    is_aw_on_watch, self._on_watch, transformed, change.operation
                )
                await _run_hook(
                    is_aw_on_after, self._on_watch_after, transformed, change.operation
                )
                is_stop = await _run_hook(
                    is_aw_on_is_stop,
                    self._on_watch_is_stop,
                    transformed,
                    change.operation,
                )
                if is_stop:
                    break
BarStreamDataFrameを含む各種プラグインはこれらのhookをオーバーライドすることで拡張機能の実装を行います。BarStreamDataFrameの_on_watchには取引履歴が流れてきて、その中身に応じてバーの更新を行なっているのですね。
別の例として、こちらは約定監視用のプラグインです。
class ExecutionWatcher(DataStorePlugin):
    def __init__(self, store: "DataStoreWrapper"):
        # ExecutionDataStore(自身の約定履歴)を監視
        super(ExecutionWatcher, self).__init__(store.execution)
        self._order_id = None
        self._item = None
        self._done = None
        self._event = asyncio.Event()
    def set(self, order_id: str) -> ExecutionWatcher:
        """監視対象の注文IDをセット"""
        if self._order_id is not None:
            raise RuntimeError(
                f"ExecutionWatcher must not be 'reused', create a new instance instead."
            )
        self._order_id = order_id
        self._event.set()
        return self
    async def _on_watch(self, d: ExecutionItem, op: str):
        """流れてきた約定情報が監視中の注文に関するものであるかをチェック
        ExecutionStoreを監視してるので、流れてくるdictはExecutionItem
        """
        if not self._event.is_set():
            # order_idがsetされるまで待機
            # 注文が即約定した時にsocket messageがresのresponseより早く到達するケースがあるので、
            # order_idがセットされるまでメッセージをここで待機させておく
            await self._event.wait()
        if d["id"] == self._order_id:
            self._done = True
            self._item = d
    def _on_watch_is_stop(self, d: dict, op: str) -> bool:
        """監視終了判定"""
        return self._done
    def done(self) -> bool:
        return self._done
    def result(self) -> ExecutionItem:
        return self._item
    async def wait(self):
        return await self._watch_task
    @property
    def order_id(self) -> str:
        return self._order_id
このようにテンプレート(i.e., DataStorePlugin)を用いhookをオーバーライドすることによって、自然と規格化されたコードになり共有・流用・拡張が容易になります。最初のうちはhookの種類や呼ばれるタイミング・順序などを覚えるコストがかかるかもしれませんが、覚えてしまえば様々なプラグインを少ないコードで書けるようになります。
現状DataStoreの上に作るプラグインのインターフェースしか用意されていませんが、APIやそれ以外のモジュール用のプラグインも構想にあります。便利なインターフェースを用意することで、多様なプラグインの作成を支援することがpybotters-wrapperの目指すこところとなります。
閑話休題: ビルトインプラグインの紹介
BinningBook
orderbookの価格を任意の単位で丸めた板にします。内部はnumpy配列で管理されていて、(条件によりますが)DataStoreからナイーブに丸め板を作るよりも10〜100倍高速です。NormalizedDataStoreを用いるので、OrderbookDataStoreの実装がある全取引所で使えます。
import numpy as np
import pybotters_wrapper as pbw
from scipy.spatial.distance import cosine
async def main():
    async with pbw.create_client() as client:
        exchange = "coincheck"
        symbol = "btc_jpy"
        store = pbw.create_store(exchange)
        # 最小値1000000/最大値4000000/1000円単位で丸め
        binning_book = pbw.plugins.binningbook(
            store,
            min_bin=1000000,
            max_bin=4000000,
            pips=1000,
        )
        await store.subscribe(["orderbook"], symbol=symbol).connect(client)
        while True:
            await store.orderbook.wait()
            # 10pips分取得
            binned_asks_price, binned_asks_size = binning_book.asks(
                n=10, non_zero_only=False
            )
            binned_bids_price, binned_bids_size = binning_book.bids(
                n=10, non_zero_only=False
            )
            # 累積注文量を計算
            # numpy.ndarrayに格納されているので、numpyの計算utilityをそのまま使用できます
            asks_cumsum = binned_asks_size.cumsum()
            bids_cumsum = binned_bids_size.cumsum()
            # 累積注文量ベクトルの内積(≒類似度)
            sim = np.dot(
                asks_cumsum / np.linalg.norm(asks_cumsum),
                bids_cumsum / np.linalg.norm(bids_cumsum),
            )
            # scipyのようなnumpy-friendlyなライブラリを使うこともできますね
            sim_scipy = 1 - cosine(asks_cumsum, bids_cumsum)
            print(asks_cumsum, bids_cumsum, sim, sim_scipy)
BookTicker
色々入ってるTickerです。tickerとかtradesとかorderbookとかわかれてるとめんどくさいから全部まとめてくださいって時に便利です。
import asyncio
import pybotters_wrapper as pbw
async def main():
    exchange = "kucoinspot"
    symbol = "BTC-USDT"
    async with pbw.create_client() as client:
        store = pbw.create_store(exchange)
        book_ticker = pbw.plugins.bookticker(store)
        await store.subscribe("public", symbol=symbol).connect(client)
        while True:
            await asyncio.sleep(1)
            print("LTP", book_ticker.price)
            print("BEST_ASK", book_ticker.best_ask, book_ticker.best_ask_size)
            print("BEST_BID", book_ticker.best_bid, book_ticker.best_bid_size)
            print("MID", book_ticker.mid)
            print("SPREAD", book_ticker.spread)
            print("ASKS", book_ticker.asks[:5])
            print("BIDS", book_ticker.bids[:5])
VolumeBarStreamDataFrame
ボリューム単位のローソク足です。OrderFlow系なども実装していけたらと思ってます。timebar・volumebarはcallbackの口があって、新規足追加時にトリガーされ、その時に処理を挟むことができます。
import asyncio
import pybotters_wrapper as pbw
def open_double_cb(df):
    return {"open2": df.open.values[-1] * 2}
async def main():
    exchange = "bybitusdt"
    symbol = "BTCUSDT"
    async with pbw.create_client() as client:
        store = pbw.create_store(exchange)
        vbar = pbw.plugins.volumebar(store, volume_unit=5, callback=[open_double_cb])
        queue = vbar.register_queue()
        await store.subscribe("trades", symbol=symbol).connect(client)
        while True:
            df = await queue.get()
            print(df[["close", "size", "buy_size", "sell_size"]].tail())
            # callbackの返り値はアンスコからアクセスできます
            print(vbar._)
            """
            ユニット量に対して大きい注文が入るとユニット量を大きく超える場合があります
                     close   size   buy_size  sell_size
              9994  16977.5  8.258     8.258      0.000
              9995  16974.5  5.003     0.719      4.284
              9996  16974.5  5.046     0.716      4.330
              9997  16973.0  5.044     2.182      2.862
              9998  16974.5  5.037     3.030      2.007
            {'open2': 33947.0}
            """
DataStoreWatchCSVWriter
DataStoreのwatchで流れてくるアイテムをCSVに書き出すプラグインです。以下はひたすら取引履歴を書き出すサンプルです。hookを活用して、AWSにアップロードしたり、MongoDBに格納したり、InfluxDB + grafanaに書き出したり、など色々なタイプのwriterをゆくゆくは提供できたらと思います(PR待ってます)。
import asyncio
import pybotters_wrapper as pbw
async def main():
    exchange = "binanceusdsm"
    symbol = "BTCUSDT"
    async with pbw.create_client() as client:
        store = pbw.create_store(exchange)
        writer = pbw.plugins.watch_csvwriter(
            store, "trades", f"{exchange}-{symbol}-trades.csv", per_day=True, flush=True
        )
        await store.subscribe("trades", symbol=symbol).connect(client)
        while True:
            await asyncio.sleep(1)
if __name__ == "__main__":
    asyncio.run(main())
いかがでしょうか。pluginによる拡張機能を活用して、もっとpybottersを・もっとasyncioを一緒に楽しめたらと思っています。
pybotters-wrapperがしないこと・向かないこと
ここではpybotters-wrapperの範疇外のことについて触れてみます。
過度な取引所間の差分吸収
取引所間の差分吸収はpybotters-wrapperの課題の一つとして設定していますが、過度な差分吸収は開発・運用コストを高めるのでそこはバランスを見つつ決めています。
現状の実装における差分吸収しない例をいくつか挙げます(今後変更される可能性はあります):
- REST APIの網羅的なサポートはしません
- 
NormalizedDataStoreの要素のうちsideについてはBUY・SELLの二種の文字列に正規化しますが、symbolについては正規化は行わず元の文字列をそのまま挿入します
- 
OCOやIFDOCOといった特殊注文はインターフェースとして提供しません
- キャンドル・清算といった二次的な情報のデータストアはNormalizedDataStoreとして提供しません
同じスクリプトをそのまま別取引所で扱える!というのは目指しません。pybotters-wrapperの運用においては、基本的に取引所別にスクリプトを別個用意するか、条件分岐などが入ることが想定されます。
とはいえ、これまでの例で見てきたように、各種モジュールは共通インターフェースを持っているため、シンプルなストラテジーであれば複数取引所対応のスクリプトを書くことは可能です。実際、用意したサンプルは複数の取引所でそのまま使い回すことができます。
速度重視の実装
pybotters-wrapperは内部で勝手に色々なことをやります。基本的に追加の処理時間がそこかしこにかかるため、速度を追求するロジックには向いていません(とはいえこれが重大な問題になるのは数msを競うようなbotだと思います)。
おわりに
pybotters最高!asyncio最高!
参考
- 
bitflyerDataStoreやBinanceSpotDataStoreがpybotters.store.DataStoreManagerになります。DataStoreManagerWrapperとすべきか悩んだんですが、DataStoreWrapperの方が素直なのでそうしました ↩
- 
例えばGMOの個別決済は他の取引所とはかなり異なる仕様であり、どのように組み込むべきか試行錯誤中です ↩ 
- 
ちなみに market_order・limit_order・cancel_orderで使うエンドポイントは取引所のAPI実装クラスのクラス変数として定義されています。 ↩
- 
ソケットのメッセージのパーズ時にで不整合があった際にRESTで問い合わせる、など。 ↩ 
