(2023/11/29 actchainkit
および三角裁定サンプルコードを追記)
actchain
の背景
非同期・疎結合なボットを作るためのライブラリを作り始めました。
Asynchrnous Crypto Trading Chain: actchain
ということで、「えいしーてぃーちぇいん」とか「あくとちぇいん」とかって呼んでいます。
Chainという名前からも連想できるように、このライブラリでは各処理を独立した疎結合モジュール(例 板情報取得・特徴量計算・注文処理)として実装して、それを鎖のように繋げて全体の処理を構成します(例 板情報取得→特徴量計算→注文処理)。
”サーバーレス・マイクロサービスアーキテクチャとそのメリット”を仮想通貨取引botの実装の世界に持ってこようというモチベーションです。
サーバーレス・マイクロサービスアーキテクチャについてはここでは立ち入りませんが、actchain
において特に達成したいと思っているのは以下の二つです:
- 拡張性の向上
- テスタビリティの向上
仮想通貨取引botはその性質上コードが複雑化しがちです。さらに、多くの処理が状態(例 ポジション)に依存しており、複雑化したコードの上で状態をモックしてテストを書くのが困難になりやすいです(心当たりが100万個くらいある)。
各処理を独立したモジュールとして疎結合に定義し組み合わせて全体の処理を構成するようにすれば、この辺りの問題が自然と解決されていくはず、というのが狙いです。
actchain
を使ったマーケットメイクbot
本記事ではレポジトリに同梱されているマーケットメイクのサンプルコードを取り上げて、actchain
を使ってどのように非同期で疎結合なbotを作るかを紹介できたらと思っています。
取引アルゴ
サンプルコードは(かの有名な?)杉原論文 で説明されているAvellaneda and Stoikov [2008]の最適指値価格の求め方を適当にアレンジしたもので以下のようなロジックになっています。
- 板の厚みから買い圧力・売り圧力を求める
- ローソク足から直近のボラティリティを求める
- ポジション量と買い・売り圧力を加味して仲値を調整
- 調整仲直±ボラティリティのところに指値を出し続ける
変哲もないふーんという感じの中身ですが、内部の処理としてはざっくりこんなものが必要になってきます。
- 板の取得・加工・売買圧力の計算
- ローソク足の取得・加工・ボラティリティの計算
- ポジションの取得・加工
- 仲値の調整・指値の算出
- 新規注文の発注・既存注文の取消
そして以下のような依存関係があります:
(これに加えて、実際には発注通知・特徴量のログ・例外発生時のハンドリングなど細かな実装が必要になってきますね。)
actchain
で書くと・・・?
さて、これをactchain
で書くとこんな感じになります:
async def main() -> None:
# 状態フロー
# ポジション状態を取得・加工・配信するフロー
flow_position_status = (
actchain.Flow("positions")
.add(PositionStatusLoop().as_chain("position"))
.add(ExtendPositionStatusFunction().as_chain("position_feature"))
)
# 注文状態を取得・配信するフロー
flow_order_status = actchain.Flow("orders").add(OrderStatusLoop().as_chain("order"))
# OHLCVを取得・加工・配信するフロー
flow_ohlcv = (
actchain.Flow("ohlcv")
.add(OHLCVLoop(config.ohlcv_interval).as_chain("ohlcv"))
.add(ExtendOHLCVFunction().as_chain("ohlcv_feature"))
)
# 板情報を取得・加工・配信するフロー
flow_orderbook = (
actchain.Flow("orderbook")
.add(OrderbookLoop().as_chain("orderbook"))
.add(ExtendOrderbookFunction().as_chain("extend_orderbook"))
)
# フィーチャー作成・状態統合するフロー
flow_feature = (
actchain.Flow("feature")
.add(flow_orderbook)
.add(BuySellRatioEstimator(config.k).as_chain("buy_sell_ratio_estimator"))
.add(actchain.AccompanyChain("position_status", flow_position_status))
.add(actchain.AccompanyChain("order_status", flow_order_status))
.add(actchain.AccompanyChain("ohlcv", flow_ohlcv))
)
# 注文フロー
# 指値価格を計算するフロー
flow_order_pricer = (
actchain.Flow("order_pricer")
.add(flow_feature)
.add(OrderPricer(config.max_position_size).as_chain("order_pricer"))
)
# 指値注文を出すフロー
flow_limit_order = (
actchain.Flow("order")
.add(flow_order_pricer)
.add(
LimitOrderCommander(
config.order_size,
config.max_position_size,
config.reorder_price_diff,
).as_chain("order_command")
)
.add(
OrderRequester().as_chain("order_request", chain_type="exclusive"),
)
)
# 取消注文を出すフロー
flow_cancel_order = (
actchain.Flow("cancel")
.add(flow_order_pricer)
.add(
CancelOrderCommander(
config.max_position_size, config.reorder_price_diff
).as_chain("cancel_order_command")
)
.add(OrderCanceler().as_chain("cancel_order_request", chain_type="exclusive"))
)
# 各フローとその中を構成するchainableを実行する
await actchain.run(
flow_position_status,
flow_order_status,
flow_orderbook,
flow_ohlcv,
flow_feature,
flow_order_pricer,
flow_limit_order,
flow_cancel_order,
run_forever=config.run_forever,
)
各処理・各オブジェクトの内部がどうなっているかは一旦置いておいて、オブジェクトを連ねることで処理の流れを記述している様子がわかるかと思います。
まさしく上に書いた図における”箱”を独立して定義・実装し、それを組み立てることでマーケットメイクbot全体の処理を構成しています。
そして各箱の実装において気をつけることは「何を入力として何を出力とするか」です。
これを意識することによって拡張性・テスタビリティの高いbotが自然と作られていきます。
処理の中身はどうなっている?
具体例として幾つかの処理の中身を見ていきます。
状態フロー
flow_position_status
を例に取り上げて、その中身を見ていきます。
flow_position_status = (
actchain.Flow("positions")
.add(PositionStatusLoop().as_chain("position"))
.add(ExtendPositionStatusFunction().as_chain("position_feature"))
)
まずはPositionStatusLoop
ですね。
PositionStatusLoop
はactchain.Loop
の子クラスでloop
メソッドがPositionStatusData
というデータを非同期ジェネレーターで生成する、という処理になっています。
from typing import AsyncGenerator, TypedDict
import pybotters_wrapper as pbw
from pybotters_wrapper.core.typedefs import PositionItem
import actchain
class PositionStatusData(TypedDict):
positions: list[PositionItem]
class PositionStatusLoop(actchain.Loop[PositionStatusData]):
def __init__(self, exchange: str = "bitflyer", symbol: str = "FX_BTC_JPY"):
super(PositionStatusLoop, self).__init__()
self._exchange = exchange
self._symbol = symbol
async def loop(self) -> AsyncGenerator[PositionStatusData, None]:
async with pbw.create_client() as client:
store = pbw.create_store(self._exchange)
await store.initialize_position(client, product_code=self._symbol)
await store.subscribe("position", self._symbol).connect(
client, auto_reconnect=True
)
while True:
yield {"positions": store.position.find({"symbol": self._symbol})}
await store.position.wait()
次に ExtendPositionStatusFunction
の中身を覗きます。
こちらはhandle
メソッドでevent
オブジェクトを受け取り、イベントオブジェクトが.data
アトリビュートで持つPositionStatusData
(PositionStatusLoop
が生成するもの)を加工して、ExtendedPositionStatusData
として返しています。
class ExtendedPositionStatusData(TypedDict):
positions: list[PositionItem]
buy_position: float
sell_position: float
net_position: float
class ExtendPositionStatusFunction(
actchain.Function[PositionStatusData, ExtendedPositionStatusData]
):
async def handle(
self, event: actchain.Event[PositionStatusData]
) -> ExtendedPositionStatusData:
return self.to_extend_position_status_data(event.data["positions"])
@classmethod
def to_extend_position_status_data(
cls, positions: list[PositionItem]
) -> ExtendedPositionStatusData:
buy_posi = sum([o["size"] for o in positions if o["side"] == "BUY"])
sell_posi = sum([o["size"] for o in positions if o["side"] == "SELL"])
net = buy_posi - sell_posi
return {
"positions": positions,
"buy_position": buy_posi,
"sell_position": sell_posi,
"net_position": net,
}
先ほどのコードに戻ります。
flow_position_status = (
actchain.Flow("positions")
.add(PositionStatusLoop().as_chain("position"))
.add(ExtendPositionStatusFunction().as_chain("position_feature"))
)
上記の二つのクラスから生成されたオブジェクトは.as_chain
を呼ばれた上でFlow
に追加されています。
お察しの通り、ここでPositionStatusLoop
→ExtendPositionStatusFunction
という処理の流れが作られているわけです。
そしてその全体をflow_position_status
として宣言しているわけですね。
このように各処理を独立したモジュールとして実装し繋げるのがactchain
の使い方です。
flow_order_status
・flow_ohlcv
・flow_orderbook
も仕組みとしては同じです。ぜひ中身を覗いてみてください。
閑話休題①: Chainable
オブジェクト
actchain
において"箱"となるオブジェクトがChainable
です。
Flow
もChainable
オブジェクトですし、as_chain
で返されるものもChainable
オブジェクトです。
その名の通りあるChainable
は他のChainable
と繋げることができます。
Chainable
一つ一つはそれぞれのループで動いており、前のChainable
が返したものをEvent
として次のChainable
に渡すようになっています。
上のコードはその”連鎖”を構築しているわけでした。
特徴量算出フロー
以下はflow_feature
の宣言部です。
# フィーチャー作成・状態統合するフロー
flow_feature = (
actchain.Flow("feature")
.add(flow_orderbook)
.add(BuySellRatioEstimator(config.k).as_chain("buy_sell_ratio_estimator"))
.add(actchain.AccompanyChain("position_status", flow_position_status))
.add(actchain.AccompanyChain("order_status", flow_order_status))
.add(actchain.AccompanyChain("ohlcv", flow_ohlcv))
)
まず前半の
flow_feature = (
actchain.Flow("feature")
.add(flow_orderbook)
.add(BuySellRatioEstimator(config.k).as_chain("buy_sell_ratio_estimator"))
...
)
までは前の節と一緒です。Flow
オブジェクトもChainable
なのでこのように繋げることができます。
そしてBuySellRatioEstimator
が受け取るものはもちろんflow_orderbook
の最後の出力です(これが何であるかはぜひ考えてみて・読んでみてください)。
問題はその次のAccompanyChain
です。
...
.add(actchain.AccompanyChain("position_status", flow_position_status))
.add(actchain.AccompanyChain("order_status", flow_order_status))
.add(actchain.AccompanyChain("ohlcv", flow_ohlcv))
)
その名の通り合流を行うためのChainable
です。中の処理は以下の図の通りです。
本流にそれぞれのフローの出力合流させてます。
このようにChainable
同士の”繋ぎ方”を定義するChainable
はactchain
が提供します。
他にもJunctionChain
・PassThroughChain
・IntervalSamplingChain
といったクラスが実装されており、様々な繋ぎ方をできるように今後拡充していく予定です。
注文フロー
最後に注文フローを見ていきます。
まずflow_order_pricer
が宣言されています。
もう説明する必要もないですね。OrderPricer
はflow_feature
の最終出力を受け取り、指値情報を返すクラスであることが見て取れますね。
flow_order_pricer = (
actchain.Flow("order_pricer")
.add(flow_feature)
.add(OrderPricer(config.max_position_size).as_chain("order_pricer"))
)
flow_order_pricer
はflow_limit_order
とflow_cancel_order
に繋がれています。両フローとも前段に注文内容を作成するChainable
がいて、後段に注文内容に従って発注を行うChainable
が設置されているわけですね。
flow_limit_order = (
actchain.Flow("order")
.add(flow_order_pricer)
.add(
LimitOrderCommander(
config.order_size,
config.max_position_size,
config.reorder_price_diff,
).as_chain("order_command")
)
.add(
OrderRequester().as_chain("order_request", chain_type="exclusive"),
)
)
flow_cancel_order = (
actchain.Flow("cancel")
.add(flow_order_pricer)
.add(
CancelOrderCommander(
config.max_position_size, config.reorder_price_diff
).as_chain("cancel_order_command")
)
.add(OrderCanceler().as_chain("cancel_order_request", chain_type="exclusive"))
)
軽くLimitOrderCommander
の中身を覗きます。
class OrderPricerReceiveData(
BuySellRatioEstimatorSendData,
ExtendedPositionStatusData,
OrderStatusData,
OHLCVData,
):
...
class LimitOrderCommand(TypedDict):
symbol: str
side: str
price: float
size: float
class LimitOrderCommanderSendData(OrderPricerSendData):
limit_order_commands: list[LimitOrderCommand]
class LimitOrderCommander(
actchain.Function[OrderPricerSendData, LimitOrderCommanderSendData]
):
def __init__(
self,
order_size: float,
max_position_size: float,
reorder_price_diff: float = 100,
symbol: str = "FX_BTC_JPY",
):
super(LimitOrderCommander, self).__init__()
self._symbol = symbol
self._order_size = order_size
self._max_position_size = max_position_size
self._reorder_price_diff = reorder_price_diff
async def handle(
self, event: actchain.Event[OrderPricerSendData]
) -> LimitOrderCommanderSendData | None:
limit_order_commands = self.create_commands(
event.data["orders"],
event.data["buy_price"],
event.data["sell_price"],
event.data["BUY"][0]["price"],
event.data["SELL"][0]["price"],
event.data["buy_position"],
event.data["sell_position"],
)
if len(limit_order_commands):
return LimitOrderCommanderSendData(
**{
**event.data,
"limit_order_commands": limit_order_commands,
}
)
else:
return None
def create_commands(
self,
orders: list[OrderItem],
buy_price: float,
sell_price: float,
best_ask: float,
best_bid: float,
buy_position: float,
sell_position: float,
) -> list[LimitOrderCommand]:
buy_orders = [
o for o in orders if o["side"] == "BUY" and o["symbol"] == self._symbol
]
sell_orders = [
o for o in orders if o["side"] == "SELL" and o["symbol"] == self._symbol
]
limit_order_commands: list[LimitOrderCommand] = []
keep_buy_orders, keep_sell_orders = [], []
for bo in buy_orders:
diff = abs(bo["price"] - buy_price)
if diff < self._reorder_price_diff:
keep_buy_orders.append(bo)
for so in sell_orders:
diff = abs(so["price"] - sell_price)
if diff < self._reorder_price_diff:
keep_sell_orders.append(so)
# 雑な部分約定清算処理
buy_hasu = float("0.00" + str(buy_position)[4:])
sell_hasu = float("0.00" + str(sell_position)[4:])
# new limit orders
ordered = False
if (
len(keep_buy_orders) == 0
and buy_position < self._max_position_size
and not RateLimitState().is_being_suspended
):
ordered = True
limit_order_commands.append(
{
"symbol": self._symbol,
"side": "BUY",
"price": min(best_bid, buy_price),
"size": self._order_size + sell_hasu,
}
)
if (
len(keep_sell_orders) == 0
and sell_position < self._max_position_size
and not RateLimitState().is_being_suspended
):
ordered = True
limit_order_commands.append(
{
"symbol": self._symbol,
"side": "SELL",
"price": max(best_ask, sell_price),
"size": self._order_size + buy_hasu,
}
)
if ordered:
RateLimitState.suspended()
return limit_order_commands
create_commands
という関数の中で状態に応じて発注すべき指値注文のリストを作っています。
このクラスは「与えられた状態に応じてあるべき注文リストを作る」それだけに集中すればいいわけで、「特徴量を計算する」だとか「APIのレスポンスを捌く」だとかはLimitOrderCommander
の仕事ではないわけです。
こうした責務の切り分けがテスタビリティを高く保つために有効になってきます(細かく切リすぎると繋げるのが大変だったりするので一概に細かければいいというわけではない点に注意です)。
閑話休題②: Chainable
のインアウトの型付け
actchain
はChainable
のイン・アウトの型付けをサポートしており(デフォルトはMapping[Any, Any]
)、TypedDict
などを使って各モジュールのイン・アウトを明示的に型付けすることを推奨しています。
型を付けることによって、IDEの補完が効いたり、バグを埋め込みにくくなったりといったメリットがあります。
pydantic
を用いることによってより厳密な型チェックを行うのも良いかもしれません。
閑話休題③: actchain.State
を使ったグローバル変数の利用
基本は疎結合にモジュールを作り、モジュールを繋げることで全体の処理を作るのがactchain
ですが、性質上グローバルに持ちたくなる値があります。
例えば、コンフィグレーションやAPI呼び出しリミット制限状態などは、全体の処理でどこからでも参照したくなやすいです。
それをサポートするのがactchain.State
クラスで、こちらはシングルトン実装がされています。
上のコードに出ていたRateLimitState
はactchain.State
を継承したシングルトンクラスで、このように使うことで安全にグローバル変数を任意のモジュールからアクセスできるようになります。
ただしグローバル変数を使いすぎると全体の管理が難しくなるので、使用はシンプルなものに留めることが推奨されます。
@dataclass
class RateLimitState(actchain.State):
last_order_ts: float = 0
last_penalized_ts: float = 0
@classmethod
def suspended(cls) -> None:
cls().last_order_ts = time.monotonic()
@classmethod
def penalized(cls) -> None:
cls().last_penalized_ts = time.monotonic()
logger.error(
f"Rate limit exceeded. No Order/Cancel request for next {config.sleep_at_api_limit} seconds."
)
@property
def is_being_penalized(self) -> bool:
return time.monotonic() - self.last_penalized_ts < config.sleep_at_api_limit
@property
def is_being_suspended(self) -> bool:
return time.monotonic() - self.last_order_ts < config.order_interval
ユニットテスト
さて、ここまできたらあとは疎結合な特性を活かしてサクッとユニットテストを書くだけです。
def test_extend_position_status_function() -> None:
positions = [
{
"symbol": "FX_BTC_JPY",
"side": "SELL",
"size": 0.01,
"price": 1000000,
},
{"symbol": "FX_BTC_JPY", "side": "SELL", "size": 0.01, "price": 1000000},
]
expected = {
"positions": positions,
"buy_position": 0.0,
"sell_position": 0.02,
"net_position": -0.02,
}
actual = ExtendPositionStatusFunction.to_extend_position_status_data(positions)
assert expected == actual
それぞれのモジュールのイン・アウトを定め、責務を限定できていれば、容易にテストをかけるようになっているはずです。
逆にテストが書きにくいとしたら、モジュールの設計がうまくできていない可能性があるので、モジュール設計から考え直すと良いかもしれません。
おわりに
このサンプルbotはbitflyer用に作りました。もっと頑張って欲しい。そんな気持ちです。
追記: actchainkit
仮想通貨トレードにおける高レベルなChainable
はこのレポにつっこんでいきたいなと思っています(今はほぼ空)。
便利なChainable
で溢れていってほしい。
三角裁定サンプルコード
せめてものお土産にということで三角裁定サンプルコードを同梱しています。