LoginSignup
20
16

actchain:非同期・疎結合仮想通貨取引bot実装ツール

Last updated at Posted at 2023-11-10

(2023/11/29 actchainkitおよび三角裁定サンプルコードを追記)

actchainの背景

非同期・疎結合なボットを作るためのライブラリを作り始めました。

Asynchrnous Crypto Trading Chain: actchainということで、「えいしーてぃーちぇいん」とか「あくとちぇいん」とかって呼んでいます。

Chainという名前からも連想できるように、このライブラリでは各処理を独立した疎結合モジュール(例 板情報取得・特徴量計算・注文処理)として実装して、それを鎖のように繋げて全体の処理を構成します(例 板情報取得→特徴量計算→注文処理)。
”サーバーレス・マイクロサービスアーキテクチャとそのメリット”を仮想通貨取引botの実装の世界に持ってこようというモチベーションです。

サーバーレス・マイクロサービスアーキテクチャについてはここでは立ち入りませんが、actchainにおいて特に達成したいと思っているのは以下の二つです:

  • 拡張性の向上
  • テスタビリティの向上

仮想通貨取引botはその性質上コードが複雑化しがちです。さらに、多くの処理が状態(例 ポジション)に依存しており、複雑化したコードの上で状態をモックしてテストを書くのが困難になりやすいです(心当たりが100万個くらいある)。

各処理を独立したモジュールとして疎結合に定義し組み合わせて全体の処理を構成するようにすれば、この辺りの問題が自然と解決されていくはず、というのが狙いです。

actchainを使ったマーケットメイクbot

本記事ではレポジトリに同梱されているマーケットメイクのサンプルコードを取り上げて、actchainを使ってどのように非同期で疎結合なbotを作るかを紹介できたらと思っています。

取引アルゴ

サンプルコードは(かの有名な?)杉原論文 で説明されているAvellaneda and Stoikov [2008]の最適指値価格の求め方を適当にアレンジしたもので以下のようなロジックになっています。

  1. 板の厚みから買い圧力・売り圧力を求める
  2. ローソク足から直近のボラティリティを求める
  3. ポジション量と買い・売り圧力を加味して仲値を調整
  4. 調整仲直±ボラティリティのところに指値を出し続ける

変哲もないふーんという感じの中身ですが、内部の処理としてはざっくりこんなものが必要になってきます。

  • 板の取得・加工・売買圧力の計算
  • ローソク足の取得・加工・ボラティリティの計算
  • ポジションの取得・加工
  • 仲値の調整・指値の算出
  • 新規注文の発注・既存注文の取消

そして以下のような依存関係があります:

(これに加えて、実際には発注通知・特徴量のログ・例外発生時のハンドリングなど細かな実装が必要になってきますね。)

actchainで書くと・・・?

さて、これをactchainで書くとこんな感じになります:

async def main() -> None:
    # 状態フロー
    # ポジション状態を取得・加工・配信するフロー
    flow_position_status = (
        actchain.Flow("positions")
        .add(PositionStatusLoop().as_chain("position"))
        .add(ExtendPositionStatusFunction().as_chain("position_feature"))
    )

    # 注文状態を取得・配信するフロー
    flow_order_status = actchain.Flow("orders").add(OrderStatusLoop().as_chain("order"))

    # OHLCVを取得・加工・配信するフロー
    flow_ohlcv = (
        actchain.Flow("ohlcv")
        .add(OHLCVLoop(config.ohlcv_interval).as_chain("ohlcv"))
        .add(ExtendOHLCVFunction().as_chain("ohlcv_feature"))
    )

    # 板情報を取得・加工・配信するフロー
    flow_orderbook = (
        actchain.Flow("orderbook")
        .add(OrderbookLoop().as_chain("orderbook"))
        .add(ExtendOrderbookFunction().as_chain("extend_orderbook"))
    )

    # フィーチャー作成・状態統合するフロー
    flow_feature = (
        actchain.Flow("feature")
        .add(flow_orderbook)
        .add(BuySellRatioEstimator(config.k).as_chain("buy_sell_ratio_estimator"))
        .add(actchain.AccompanyChain("position_status", flow_position_status))
        .add(actchain.AccompanyChain("order_status", flow_order_status))
        .add(actchain.AccompanyChain("ohlcv", flow_ohlcv))
    )

    # 注文フロー
    # 指値価格を計算するフロー
    flow_order_pricer = (
        actchain.Flow("order_pricer")
        .add(flow_feature)
        .add(OrderPricer(config.max_position_size).as_chain("order_pricer"))
    )

    # 指値注文を出すフロー
    flow_limit_order = (
        actchain.Flow("order")
        .add(flow_order_pricer)
        .add(
            LimitOrderCommander(
                config.order_size,
                config.max_position_size,
                config.reorder_price_diff,
            ).as_chain("order_command")
        )
        .add(
            OrderRequester().as_chain("order_request", chain_type="exclusive"),
        )
    )

    # 取消注文を出すフロー
    flow_cancel_order = (
        actchain.Flow("cancel")
        .add(flow_order_pricer)
        .add(
            CancelOrderCommander(
                config.max_position_size, config.reorder_price_diff
            ).as_chain("cancel_order_command")
        )
        .add(OrderCanceler().as_chain("cancel_order_request", chain_type="exclusive"))
    )

    # 各フローとその中を構成するchainableを実行する
    await actchain.run(
        flow_position_status,
        flow_order_status,
        flow_orderbook,
        flow_ohlcv,
        flow_feature,
        flow_order_pricer,
        flow_limit_order,
        flow_cancel_order,
        run_forever=config.run_forever,
    )

各処理・各オブジェクトの内部がどうなっているかは一旦置いておいて、オブジェクトを連ねることで処理の流れを記述している様子がわかるかと思います。

まさしく上に書いた図における”箱”を独立して定義・実装し、それを組み立てることでマーケットメイクbot全体の処理を構成しています。

そして各箱の実装において気をつけることは「何を入力として何を出力とするか」です。

これを意識することによって拡張性・テスタビリティの高いbotが自然と作られていきます。

処理の中身はどうなっている?

具体例として幾つかの処理の中身を見ていきます。

状態フロー

flow_position_statusを例に取り上げて、その中身を見ていきます。

flow_position_status = (
    actchain.Flow("positions")
    .add(PositionStatusLoop().as_chain("position"))
    .add(ExtendPositionStatusFunction().as_chain("position_feature"))
)

まずはPositionStatusLoopですね。

PositionStatusLoopactchain.Loopの子クラスでloopメソッドがPositionStatusDataというデータを非同期ジェネレーターで生成する、という処理になっています。

from typing import AsyncGenerator, TypedDict

import pybotters_wrapper as pbw
from pybotters_wrapper.core.typedefs import PositionItem

import actchain


class PositionStatusData(TypedDict):
    positions: list[PositionItem]


class PositionStatusLoop(actchain.Loop[PositionStatusData]):
    def __init__(self, exchange: str = "bitflyer", symbol: str = "FX_BTC_JPY"):
        super(PositionStatusLoop, self).__init__()
        self._exchange = exchange
        self._symbol = symbol

    async def loop(self) -> AsyncGenerator[PositionStatusData, None]:
        async with pbw.create_client() as client:
            store = pbw.create_store(self._exchange)
            await store.initialize_position(client, product_code=self._symbol)
            await store.subscribe("position", self._symbol).connect(
                client, auto_reconnect=True
            )
            while True:
                yield {"positions": store.position.find({"symbol": self._symbol})}
                await store.position.wait()

次に ExtendPositionStatusFunctionの中身を覗きます。

こちらはhandleメソッドでeventオブジェクトを受け取り、イベントオブジェクトが.dataアトリビュートで持つPositionStatusDataPositionStatusLoopが生成するもの)を加工して、ExtendedPositionStatusDataとして返しています。

class ExtendedPositionStatusData(TypedDict):
    positions: list[PositionItem]
    buy_position: float
    sell_position: float
    net_position: float


class ExtendPositionStatusFunction(
    actchain.Function[PositionStatusData, ExtendedPositionStatusData]
):
    async def handle(
        self, event: actchain.Event[PositionStatusData]
    ) -> ExtendedPositionStatusData:
        return self.to_extend_position_status_data(event.data["positions"])

    @classmethod
    def to_extend_position_status_data(
        cls, positions: list[PositionItem]
    ) -> ExtendedPositionStatusData:
        buy_posi = sum([o["size"] for o in positions if o["side"] == "BUY"])
        sell_posi = sum([o["size"] for o in positions if o["side"] == "SELL"])
        net = buy_posi - sell_posi
        return {
            "positions": positions,
            "buy_position": buy_posi,
            "sell_position": sell_posi,
            "net_position": net,
        }

先ほどのコードに戻ります。

flow_position_status = (
    actchain.Flow("positions")
    .add(PositionStatusLoop().as_chain("position"))
    .add(ExtendPositionStatusFunction().as_chain("position_feature"))
)

上記の二つのクラスから生成されたオブジェクトは.as_chainを呼ばれた上でFlowに追加されています。

お察しの通り、ここでPositionStatusLoopExtendPositionStatusFunctionという処理の流れが作られているわけです。

そしてその全体をflow_position_statusとして宣言しているわけですね。

このように各処理を独立したモジュールとして実装し繋げるのがactchainの使い方です。

flow_order_statusflow_ohlcvflow_orderbookも仕組みとしては同じです。ぜひ中身を覗いてみてください。

閑話休題①: Chainableオブジェクト

actchainにおいて"箱"となるオブジェクトがChainableです。

FlowChainableオブジェクトですし、as_chainで返されるものもChainableオブジェクトです。

その名の通りあるChainableは他のChainableと繋げることができます。

Chainable一つ一つはそれぞれのループで動いており、前のChainableが返したものをEventとして次のChainableに渡すようになっています。

上のコードはその”連鎖”を構築しているわけでした。

特徴量算出フロー

以下はflow_featureの宣言部です。

# フィーチャー作成・状態統合するフロー
flow_feature = (
    actchain.Flow("feature")
    .add(flow_orderbook)
    .add(BuySellRatioEstimator(config.k).as_chain("buy_sell_ratio_estimator"))
    .add(actchain.AccompanyChain("position_status", flow_position_status))
    .add(actchain.AccompanyChain("order_status", flow_order_status))
    .add(actchain.AccompanyChain("ohlcv", flow_ohlcv))
)

まず前半の

flow_feature = (
    actchain.Flow("feature")
    .add(flow_orderbook)
    .add(BuySellRatioEstimator(config.k).as_chain("buy_sell_ratio_estimator"))
    ...
)

までは前の節と一緒です。FlowオブジェクトもChainableなのでこのように繋げることができます。

そしてBuySellRatioEstimatorが受け取るものはもちろんflow_orderbookの最後の出力です(これが何であるかはぜひ考えてみて・読んでみてください)。

問題はその次のAccompanyChainです。

    ...
    .add(actchain.AccompanyChain("position_status", flow_position_status))
    .add(actchain.AccompanyChain("order_status", flow_order_status))
    .add(actchain.AccompanyChain("ohlcv", flow_ohlcv))
)

その名の通り合流を行うためのChainableです。中の処理は以下の図の通りです。

本流にそれぞれのフローの出力合流させてます。

このようにChainable同士の”繋ぎ方”を定義するChainableactchainが提供します。

他にもJunctionChainPassThroughChainIntervalSamplingChainといったクラスが実装されており、様々な繋ぎ方をできるように今後拡充していく予定です。

注文フロー

最後に注文フローを見ていきます。

まずflow_order_pricerが宣言されています。

もう説明する必要もないですね。OrderPricerflow_featureの最終出力を受け取り、指値情報を返すクラスであることが見て取れますね。

flow_order_pricer = (
    actchain.Flow("order_pricer")
    .add(flow_feature)
    .add(OrderPricer(config.max_position_size).as_chain("order_pricer"))
)

flow_order_pricerflow_limit_orderflow_cancel_orderに繋がれています。両フローとも前段に注文内容を作成するChainableがいて、後段に注文内容に従って発注を行うChainableが設置されているわけですね。

flow_limit_order = (
    actchain.Flow("order")
    .add(flow_order_pricer)
    .add(
        LimitOrderCommander(
            config.order_size,
            config.max_position_size,
            config.reorder_price_diff,
        ).as_chain("order_command")
    )
    .add(
        OrderRequester().as_chain("order_request", chain_type="exclusive"),
    )
)

flow_cancel_order = (
    actchain.Flow("cancel")
    .add(flow_order_pricer)
    .add(
        CancelOrderCommander(
            config.max_position_size, config.reorder_price_diff
        ).as_chain("cancel_order_command")
    )
    .add(OrderCanceler().as_chain("cancel_order_request", chain_type="exclusive"))
)

軽くLimitOrderCommanderの中身を覗きます。

class OrderPricerReceiveData(
    BuySellRatioEstimatorSendData,
    ExtendedPositionStatusData,
    OrderStatusData,
    OHLCVData,
):
    ...

class LimitOrderCommand(TypedDict):
    symbol: str
    side: str
    price: float
    size: float


class LimitOrderCommanderSendData(OrderPricerSendData):
    limit_order_commands: list[LimitOrderCommand]


class LimitOrderCommander(
    actchain.Function[OrderPricerSendData, LimitOrderCommanderSendData]
):
    def __init__(
        self,
        order_size: float,
        max_position_size: float,
        reorder_price_diff: float = 100,
        symbol: str = "FX_BTC_JPY",
    ):
        super(LimitOrderCommander, self).__init__()
        self._symbol = symbol
        self._order_size = order_size
        self._max_position_size = max_position_size
        self._reorder_price_diff = reorder_price_diff

    async def handle(
        self, event: actchain.Event[OrderPricerSendData]
    ) -> LimitOrderCommanderSendData | None:
        limit_order_commands = self.create_commands(
            event.data["orders"],
            event.data["buy_price"],
            event.data["sell_price"],
            event.data["BUY"][0]["price"],
            event.data["SELL"][0]["price"],
            event.data["buy_position"],
            event.data["sell_position"],
        )

        if len(limit_order_commands):
            return LimitOrderCommanderSendData(
                **{
                    **event.data,
                    "limit_order_commands": limit_order_commands,
                }
            )
        else:
            return None

    def create_commands(
        self,
        orders: list[OrderItem],
        buy_price: float,
        sell_price: float,
        best_ask: float,
        best_bid: float,
        buy_position: float,
        sell_position: float,
    ) -> list[LimitOrderCommand]:
        buy_orders = [
            o for o in orders if o["side"] == "BUY" and o["symbol"] == self._symbol
        ]
        sell_orders = [
            o for o in orders if o["side"] == "SELL" and o["symbol"] == self._symbol
        ]

        limit_order_commands: list[LimitOrderCommand] = []
        keep_buy_orders, keep_sell_orders = [], []

        for bo in buy_orders:
            diff = abs(bo["price"] - buy_price)
            if diff < self._reorder_price_diff:
                keep_buy_orders.append(bo)

        for so in sell_orders:
            diff = abs(so["price"] - sell_price)
            if diff < self._reorder_price_diff:
                keep_sell_orders.append(so)

        # 雑な部分約定清算処理
        buy_hasu = float("0.00" + str(buy_position)[4:])
        sell_hasu = float("0.00" + str(sell_position)[4:])

        # new limit orders
        ordered = False
        if (
            len(keep_buy_orders) == 0
            and buy_position < self._max_position_size
            and not RateLimitState().is_being_suspended
        ):
            ordered = True
            limit_order_commands.append(
                {
                    "symbol": self._symbol,
                    "side": "BUY",
                    "price": min(best_bid, buy_price),
                    "size": self._order_size + sell_hasu,
                }
            )

        if (
            len(keep_sell_orders) == 0
            and sell_position < self._max_position_size
            and not RateLimitState().is_being_suspended
        ):
            ordered = True
            limit_order_commands.append(
                {
                    "symbol": self._symbol,
                    "side": "SELL",
                    "price": max(best_ask, sell_price),
                    "size": self._order_size + buy_hasu,
                }
            )

        if ordered:
            RateLimitState.suspended()

        return limit_order_commands

create_commandsという関数の中で状態に応じて発注すべき指値注文のリストを作っています。

このクラスは「与えられた状態に応じてあるべき注文リストを作る」それだけに集中すればいいわけで、「特徴量を計算する」だとか「APIのレスポンスを捌く」だとかはLimitOrderCommanderの仕事ではないわけです。

こうした責務の切り分けがテスタビリティを高く保つために有効になってきます(細かく切リすぎると繋げるのが大変だったりするので一概に細かければいいというわけではない点に注意です)。

閑話休題②: Chainableのインアウトの型付け

actchainChainableのイン・アウトの型付けをサポートしており(デフォルトはMapping[Any, Any])、TypedDictなどを使って各モジュールのイン・アウトを明示的に型付けすることを推奨しています。

型を付けることによって、IDEの補完が効いたり、バグを埋め込みにくくなったりといったメリットがあります。

pydanticを用いることによってより厳密な型チェックを行うのも良いかもしれません。

閑話休題③: actchain.Stateを使ったグローバル変数の利用

基本は疎結合にモジュールを作り、モジュールを繋げることで全体の処理を作るのがactchainですが、性質上グローバルに持ちたくなる値があります。
例えば、コンフィグレーションやAPI呼び出しリミット制限状態などは、全体の処理でどこからでも参照したくなやすいです。

それをサポートするのがactchain.Stateクラスで、こちらはシングルトン実装がされています。

上のコードに出ていたRateLimitStateactchain.Stateを継承したシングルトンクラスで、このように使うことで安全にグローバル変数を任意のモジュールからアクセスできるようになります。

ただしグローバル変数を使いすぎると全体の管理が難しくなるので、使用はシンプルなものに留めることが推奨されます。

@dataclass
class RateLimitState(actchain.State):
    last_order_ts: float = 0
    last_penalized_ts: float = 0

    @classmethod
    def suspended(cls) -> None:
        cls().last_order_ts = time.monotonic()

    @classmethod
    def penalized(cls) -> None:
        cls().last_penalized_ts = time.monotonic()
        logger.error(
            f"Rate limit exceeded. No Order/Cancel request for next {config.sleep_at_api_limit} seconds."
        )

    @property
    def is_being_penalized(self) -> bool:
        return time.monotonic() - self.last_penalized_ts < config.sleep_at_api_limit

    @property
    def is_being_suspended(self) -> bool:
        return time.monotonic() - self.last_order_ts < config.order_interval

ユニットテスト

さて、ここまできたらあとは疎結合な特性を活かしてサクッとユニットテストを書くだけです。

def test_extend_position_status_function() -> None:
    positions = [
        {
            "symbol": "FX_BTC_JPY",
            "side": "SELL",
            "size": 0.01,
            "price": 1000000,
        },
        {"symbol": "FX_BTC_JPY", "side": "SELL", "size": 0.01, "price": 1000000},
    ]

    expected = {
        "positions": positions,
        "buy_position": 0.0,
        "sell_position": 0.02,
        "net_position": -0.02,
    }

    actual = ExtendPositionStatusFunction.to_extend_position_status_data(positions)

    assert expected == actual

それぞれのモジュールのイン・アウトを定め、責務を限定できていれば、容易にテストをかけるようになっているはずです。
逆にテストが書きにくいとしたら、モジュールの設計がうまくできていない可能性があるので、モジュール設計から考え直すと良いかもしれません。

おわりに

このサンプルbotはbitflyer用に作りました。もっと頑張って欲しい。そんな気持ちです。

追記: actchainkit

仮想通貨トレードにおける高レベルなChainableはこのレポにつっこんでいきたいなと思っています(今はほぼ空)。

便利なChainableで溢れていってほしい。

三角裁定サンプルコード

せめてものお土産にということで三角裁定サンプルコードを同梱しています。

20
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
20
16