0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

リアルタイム暗号通貨アービトラージシステムを構築しました ― ミリ秒の価格ギャップをリアル利益に変える

Posted at

リアルタイム暗号通貨アービトラージシステムを構築しました ― ミリ秒の価格ギャップをリアル利益に変える

アービトラージはとてもシンプルな戦略です。価格が安い取引所で資産を買い、価格が高い別の取引所で売るだけです。

しかし暗号資産市場では、このような価格差(スプレッド)はミリ秒単位で出現しては消えていきます。もしあなたのデータパイプラインが、価格のバッチを処理するのに 5 秒かかるようであれば、そのころにはチャンスはもう消えているでしょう。

この記事では、オープンソースのリアルタイムイベントストリーミング基盤である RisingWave を使い、標準的な SQL だけで サブ秒レイテンシでアービトラージ機会を検知する方法 を紹介します。

エンジニアリング上のボトルネック

アービトラージを行うには、BinanceCoinbaseOKX、そして DEX といった複数の取引所に分散した流動性を同時に監視する必要があります。
その結果、エンジニアリング的には次の 3 つの課題が立ちはだかります。

  1. 速度(Velocity): ボラティリティが高い局面では、1 秒あたり 10,000 件を超えるティックデータが流入する可能性があります。
  2. 同期(Synchronization): Binance の 10:00:01 の価格と Coinbase の 10:00:05 の価格を比較しても意味はありません。比較対象となる価格は、時間的にきちんと揃っていなければならない からです。
  3. レイテンシ(Latency): 利益を得られる可能性は、数百ミリ秒のうちにほぼゼロまで減衰してしまいます。

従来型の ETL パイプラインでは、この条件を満たすには遅すぎます。必要なのは、バッチ単位ではなく、データが到着した瞬間に処理していくシステム です。

RisingWave が適している理由

RisingWave は、まさにこのような高スループットなストリームを処理するために設計されたストリーミングデータベースです。
従来の Postgres におけるマテリアライズドビューが「ある時点の静的なスナップショット」であるのに対し、RisingWave のマテリアライズドビューは継続的に更新され続けます

  • Kafka や WebSocket などのイベントストリームとネイティブに接続できる
  • 新しく到着したデータだけをインクリメンタルに処理し、結果を更新できる
  • 時間ウィンドウに関するロジックをネイティブに扱える
  • すべて SQL で記述できるため、トレーディング戦略を定義するのに複雑な Java や Flink のジョブを書く必要がない

監視システムの構築方法

1. 市場データの取り込み

まず、市場データが Kafka のようなメッセージキューに流れ込んでいる状況を想定します。
ここでは、Binance 用と Coinbase 用の 2 つのソースを定義します。

Kafka のトピックをテーブルのように扱い、RisingWave が JSON イベントをそのまま取り込みます。

CREATE SOURCE binance_prices (
    symbol VARCHAR,
    price DECIMAL,
    ts TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'binance.ticker',
    properties.bootstrap.server = 'localhost:9092'
) FORMAT PLAIN ENCODE JSON;

CREATE SOURCE coinbase_prices (
    symbol VARCHAR,
    price DECIMAL,
    ts TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'coinbase.ticker',
    properties.bootstrap.server = 'localhost:9092'
) FORMAT PLAIN ENCODE JSON;

2. コアロジック:スプレッドの検知

次に、同じシンボルに対して価格差が一定の利益閾値(例:0.5%)を超えた瞬間を見つける必要があります。

ここで重要なのは、ほぼ同じタイミングで存在していた価格同士を比較すること です。
数秒ずれたデータポイント同士の価格差は、有効なアービトラージ機会とは言えません。

そのために、以下のようなインターバル結合条件を使います:
ABS(EXTRACT(EPOCH FROM (b.ts - c.ts))) < 1
この条件によって、「1 秒以内に発生したティック同士」に比較対象を絞り込みます。

CREATE MATERIALIZED VIEW arbitrage_opportunities AS
SELECT
    b.symbol,
    b.price AS binance_price,
    c.price AS coinbase_price,
    (c.price - b.price) / b.price * 100 AS spread_percent,
    b.ts
FROM binance_prices b
JOIN coinbase_prices c
  ON b.symbol = c.symbol
AND ABS(EXTRACT(EPOCH FROM (b.ts - c.ts))) < 1  *-- 1 秒以内のティックのみを整列して比較*WHERE ABS((c.price - b.price) / b.price * 100) > 0.5;  *-- 閾値 0.5% を超えたスプレッドのみを抽出*

新しいティックが到着するたびに、RisingWave はこのビューを即座に更新します。
スプレッドが発生したタイミングでは、その場でこのビューにレコードとして現れます。

3. 市場全体のアグリゲーション

市場の分断状況を俯瞰するために、すべての取引所を跨いだグローバルな「Best Bid and Offer(BBO)」を見たい、というケースもあるはずです。

ここでは UNION ALL を使って複数のストリームを結合し、HOP によるホッピングウィンドウを適用して、短い時間間隔ごとのスプレッド統計を計算します。

CREATE MATERIALIZED VIEW best_bid_offer AS
WITH t AS (SELECT symbol, price, ts FROM binance_prices
           UNION ALL
           SELECT symbol, price, ts FROM coinbase_prices)
SELECT
    symbol,
    MIN(price) AS best_bid,
    MAX(price) AS best_offer,
    MAX(price) - MIN(price) AS spread
FROM HOP(t, ts, INTERVAL '1 second', INTERVAL '5 seconds') 
GROUP BY symbol, window_start;

このようにして、「ある 5 秒間の中で、そのシンボルにどれくらいの価格乖離が生じていたのか」を継続的に可視化できます。

4. 実行可能なアウトプット(Subscriptions と Sinks)

スプレッドを検知できても、それだけではまだ半分です。
トレーディングボットを即座にトリガーできる仕組み が必要になります。

これまでであれば、このデータを Redis や Kafka にシンクし、ボット側がそれを購読・ポーリングする構成を取るのが一般的でした。
RisingWave には、これをよりシンプルにしてくれる Subscriptions という機能があります。

Subscription を使うと、アプリケーションはマテリアライズドビューの更新を 直接ストリームとして購読 できます。
トレーディングボットは RisingWave に接続し、新しいアービトラージ機会が生成されると、その更新を即時に受け取ることができます。
中間にメッセージキューを挟む必要がなくなり、レイテンシとインフラ構成の両方を抑えられます。

CREATE SUBSCRIPTION arb_subscription
FROM arbitrage_opportunities
WITH (
    retention = '1h'
);

一方で、バックテスト用にデータを長期保存したい場合や、既存システムと連携したい場合は、Sinks を使うこともできます。
たとえば S3 にシンクしてアーカイブしたり、複数の外部サービスが同じ状態を参照する必要があるなら Redis にシンクする、といった構成が考えられます。

CREATE SINK arb_signals
FROM arbitrage_opportunities
WITH (
    connector = 'redis',
    primary_key = 'arb_signals',
    redis.url= 'redis://localhost:6379/'
) FORMAT PLAIN ENCODE JSON (
    force_append_only='true'
);

戦略の拡張

一度パイプラインが動き始めてしまえば、あとは SQL を書き換えていくだけで、より複雑な戦略も素早く試すことができます。

  • 三角アービトラージ:
    同一取引所内で BTC/USDT・ETH/BTC・ETH/USDT の 3 つのペアを結合し、レートの歪みを検知する。
  • 資金調達率アービトラージ:
    現物価格とパーペチュアルスワップのファンディングレートの乖離を監視し、その差を取りにいく。
  • レイテンシ監視:
    取引所 API が返すタイムスタンプと、自分のシステムでの取り込み時刻を比較し、ネットワーク遅延を継続的にモニタリングする。

いずれも「データストリームに対してどういうクエリを書くか」の問題に落とし込めるため、戦略の試行錯誤がしやすくなります。

まとめ

暗号資産トレーディングにおいて、スピードは最大の競争優位 です。

処理ロジックをアプリケーション側ではなく RisingWave に寄せることで、複雑なアプリケーションコードを書くことなく、サブ秒レイテンシで市場の異常(= アービトラージ機会)を検知できるようになります。

ウィンドウ処理や結合の状態管理はデータベース側に任せられるため、トレーディング戦略は 純粋な SQL だけで記述できる ようになります。
その結果、インフラの複雑さを抑えつつ、「アイデアをすぐクエリとして試す」ことに集中できるようになります。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?