0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

RisingWave と Streaming SQL を用いたリアルタイム PnL エンジンの構築

Posted at

金融取引において、リアルタイムで損益(PnL, Profit and Loss)を計算することは、リスク管理、パフォーマンス監視、アルゴリズムによる意思決定にとって不可欠な要件です。従来のバッチ処理ベースのシステムは、現代市場の高速度データストリームに追随することが困難です。本技術ガイドでは、ストリーミングデータベースと単一の SQL クエリセットを用いて、堅牢なリアルタイム PnL 計算エンジンを構築するための包括的なプレイブックを提供します。

ここでは、生の取引データや価格データの取り込みから、リアルタイムでの実現損益および未実現損益の計算、さらに遅延データや複数通貨換算といった実運用で直面する課題への対応までをカバーします。

学べること(TL;DR)

  • リアルタイム PnL 維持: 単一のストリーミング SQL クエリセットを使用し、ポジション、平均コスト、実現損益および未実現損益を継続的に維持。
  • 実データ対応: ウォーターマークを用いて順序乱れや遅延イベントを正しく処理し、取引手数料を換算し、複数通貨建ての取引を USD などの共通通貨に正規化。
  • 増分的かつ効率的: マテリアライズドビューの階層化(MV-on-MV)アーキテクチャを活用して増分維持を行い、サブ秒更新を実現し、状態のリプレイや再構築を容易にする。

データモデルと入力

PnL 計算パイプラインは、2 つの主要なデータストリームに依存します。1 つは個々の取引イベントのストリーム、もう 1 つは通貨換算に利用するリアルタイム市場価格のストリームです。

取引イベントストリーム

このストリームには、トランザクションごとの生取引データが含まれます。

フィールド(例):

  • account_id, symbol, ts(イベント時刻)
  • sideBUY / SELL)、qtyprice(見積通貨または USD 建て)
  • fee, fee_ccy(手数料金額とその通貨)
CREATE SOURCE trades_src (
  account_id BIGINT,
  symbol     VARCHAR,
  ts         TIMESTAMP,
  side       VARCHAR,          -- 'BUY' / 'SELL'
  qty        DOUBLE PRECISION,
  price      DOUBLE PRECISION, -- USD 価格を前提とする。見積通貨の場合は後述の換算処理を行う。
  fee        DOUBLE PRECISION,
  fee_ccy    VARCHAR
) WITH (
  connector = 'kafka',
  topic = 'trades',
  properties.bootstrap.server = 'kafka:9092',
  scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON
  -- イベント時刻として `ts` を使用し、5 秒の順序乱れウィンドウを許容。
  WATERMARK FOR ts AS ts - INTERVAL '5 seconds';

価格および FX ストリーム

このストリームは、関連するすべての資産のリアルタイム価格を提供し、異なる通貨を USD に正規化するために使用されます。

CREATE SOURCE prices_src (
  symbol  VARCHAR,
  ts      TIMESTAMP,
  px_usd  DOUBLE PRECISION
) WITH (
  connector = 'kafka',
  topic = 'prices',
  properties.bootstrap.server = 'kafka:9092',
  scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON
  -- ウォーターマークにより、2 秒の順序乱れウィンドウを許容。
  WATERMARK FOR ts AS ts - INTERVAL '2 seconds';

-- 各シンボルの最新 USD 価格を常に保持するマテリアライズドビューを作成。
CREATE MATERIALIZED VIEW latest_price AS
SELECT
  symbol,
  MAX_BY(px_usd, ts) AS px_usd,
  MAX(ts)            AS ts
FROM prices_src
GROUP BY symbol;

通貨換算に関する注意: trades_src.price が USD ではなく見積通貨(例: USDT)の場合は、prices_src との時間結合を行うことで USD に換算できます。同様のロジックは手数料にも適用可能です。

取引の正規化と手数料換算

最初のステップは、取引の BUY / SELL サイドを数値的な符号(+1 / -1)に正規化し、すべての手数料を USD に換算することで、取引を統一ビューにまとめることです。これにより、後続の計算が単純化されます。

-- 時間結合(±3 秒)を使用して対応する FX レートを取得し、手数料通貨を USD に換算。
CREATE MATERIALIZED VIEW trades_enriched AS
SELECT
  t.account_id,
  t.symbol,
  t.ts,
  UPPER(t.side) AS side,
  CASE WHEN UPPER(t.side)='BUY'  THEN  1
       WHEN UPPER(t.side)='SELL' THEN -1 END      AS sign,
  t.qty,
  t.price,                         -- すでに USD 価格であると仮定。
  (t.qty * t.price)                AS notional_usd,
  t.fee,
  t.fee_ccy,
  -- 手数料を USD に換算: fee_ccy がすでに USD の場合は 1.0 を使用。それ以外は最新の px_usd を使用。
  t.fee * COALESCE(p.px_usd, 1.0)  AS fee_usd
FROM trades_src t
LEFT JOIN prices_src p
  ON p.symbol = t.fee_ccy
 AND p.ts BETWEEN t.ts - INTERVAL '3 seconds' AND t.ts + INTERVAL '3 seconds';

移動平均コストによるリアルタイム PnL 集計

PnL を計算するためには、売却資産の取得原価を算出する方法が必要です。FIFO/LIFO のような方法は正確ですが、リアルタイムストリーミング環境での実装は複雑です。移動平均コスト法は、増分的でステートフルなストリーム処理に適した方法です。

コアロジック(移動平均コスト法)

  • BUY の場合:
    • new_qty = old_qty + qty
    • avg_cost = (old_qty * avg_cost + qty * trade_price) / new_qty
  • SELL の場合:
    • realized_pnl += (trade_price - avg_cost) * qty - fee_usd
    • new_qty = old_qty - qtyavg_cost は変更なし)

このステートフルで経路依存的なロジックは、ユーザー定義集約関数(UDAF)にカプセル化できます。UDAF は、取引がストリームに流入するたびに、各キー(アカウント × シンボルの組み合わせ)の状態を維持します。

-- 疑似コード: ステートフル集約 UDAF を登録。
-- 実装は Python/Java/Rust などでエンジニアリングチームが提供する想定。
-- 入力: price, qty, side, fee_usd
-- 状態: pos_qty, avg_cost, realized_pnl, fee_cum
CREATE AGGREGATE pnl_avg_cost(
  price DOUBLE PRECISION,
  qty   DOUBLE PRECISION,
  side  VARCHAR,
  fee_usd DOUBLE PRECISION
) RETURNS JSON;  -- 戻り値はエンジンの機能に応じて STRUCT または JSON を利用可能。

ストリーミングエンジンが UDAF から STRUCT などの複合型を返すことをサポートしている場合、それがより効率的です。そうでない場合は、柔軟な代替手段として JSON を返すことが可能です。

アカウント・シンボルごとの PnL 状態計算

UDAF が定義できれば、各アカウント・シンボルに対する PnL 状態を計算するマテリアライズドビューを作成できます。

-- enriched view から取引をアカウント・シンボルごとに集約。
CREATE MATERIALIZED VIEW account_symbol_pnl_state AS
SELECT
  account_id,
  symbol,
  -- UDAF が状態(pos_qty, avg_cost, realized_pnl, fee_cum)を増分的に維持。
  pnl_avg_cost(price, qty, side, fee_usd) AS state_json
FROM trades_enriched
GROUP BY account_id, symbol;

UDAF 出力のパース

UDAF が JSON オブジェクトを返す場合、クエリを容易にするためにそれを列に分解するビューを作成します。

-- JSON オブジェクトを列に展開。
CREATE MATERIALIZED VIEW account_symbol_pnl AS
SELECT
  account_id,
  symbol,
  CAST(state_json->>'pos_qty'       AS DOUBLE PRECISION) AS pos_qty,
  CAST(state_json->>'avg_cost'      AS DOUBLE PRECISION) AS avg_cost,
  CAST(state_json->>'realized_pnl'  AS DOUBLE PRECISION) AS realized_pnl_usd,
  CAST(state_json->>'fee_cum'       AS DOUBLE PRECISION) AS fee_usd
FROM account_symbol_pnl_state;

未実現損益と総損益(最新価格との結合)

実現損益と現在のポジション状態を計算したら、次は latest_price ビューとの結合により未実現損益を算出できます。総損益は実現損益と未実現損益の合計です。

CREATE MATERIALIZED VIEW account_symbol_pnl_live AS
SELECT
  s.account_id,
  s.symbol,
  s.pos_qty,
  s.avg_cost,
  s.realized_pnl_usd,
  s.fee_usd,
  lp.px_usd AS px_now,
  -- 未実現損益 = ポジション数量 × (現在価格 - 平均コスト)
  s.pos_qty * (lp.px_usd - s.avg_cost) AS unrealized_pnl_usd,
  -- 総損益 = 実現損益 + 未実現損益
  s.realized_pnl_usd + s.pos_qty * (lp.px_usd - s.avg_cost) AS total_pnl_usd
FROM account_symbol_pnl s
LEFT JOIN latest_price lp
  ON lp.symbol = s.symbol;

時間ウィンドウ付きビュー(24 時間 / 7 日 / 30 日)

よくある要件として、直近 24 時間の PnL 変化など特定の時間ウィンドウで損益を表示することがあります。これは複雑ですが、未実現損益の変化を推定し、そのウィンドウ内の実現損益と組み合わせることで効果的に対応できます。

過去価格スナップショットの保存

まず、参照点として利用できる価格の定期スナップショットを作成する必要があります。

CREATE MATERIALIZED VIEW price_snap_1h AS
SELECT
  symbol,
  DATE_TRUNC('hour', ts) AS snap_ts,
  MAX_BY(px_usd, ts)     AS px_usd
FROM prices_src
GROUP BY symbol, DATE_TRUNC('hour', ts);

24 時間 PnL 変化の推定

現在の PnL 状態を 24 時間前の価格スナップショットと結合することで、PnL の変化を近似できます。

-- 24 時間前の最も近い時間の価格スナップショットを参照価格として取得。
CREATE MATERIALIZED VIEW account_symbol_pnl_24h AS
SELECT
  s.account_id,
  s.symbol,
  s.pos_qty,
  s.avg_cost,
  s.realized_pnl_usd,
  s.fee_usd,
  lp.px_usd     AS px_now,
  ref.px_usd    AS px_24h_ago,
  -- 直近 24h の未実現損益変化 ≈ pos_qty * (px_now - px_24h_ago)
  s.pos_qty * (lp.px_usd - ref.px_usd) AS unrealized_delta_24h,
  -- 総 24h PnL を得るには、UDAF を拡張し、そのウィンドウでの実現損益を出力する必要がある。
  NULL::DOUBLE PRECISION AS realized_24h,
  NULL::DOUBLE PRECISION AS total_24h
FROM account_symbol_pnl s
LEFT JOIN latest_price lp
  ON lp.symbol = s.symbol
LEFT JOIN LATERAL (
  SELECT px_usd
  FROM price_snap_1h
  WHERE symbol = s.symbol
    AND snap_ts <= NOW() - INTERVAL '24 hours'
  ORDER BY snap_ts DESC
  LIMIT 1
) ref ON TRUE;

正確なウィンドウ単位の実現損益を求める場合は、UDAF を拡張し、タンブリングまたはホッピングウィンドウ集約をサポートさせ、各アクティブウィンドウ(例: 24h, 7d)の実現損益を維持する必要があります。

結果の配信(シンク)

最後に、リアルタイム PnL の結果をダッシュボード、API、データウェアハウスなどの下流システムに SINK を使って送信できます。

CREATE SINK pnl_live_sink
FROM account_symbol_pnl_live
WITH (
  connector = 'kafka',
  topic = 'pnl_live',
  properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

正確性とエンジニアリング上の考慮事項

  • ウォーターマーク: イベント遅延の 95 パーセンタイルまたは 99 パーセンタイルに小さなマージンを加えた値を設定。これにより、正確性とエンドツーエンドのレイテンシのバランスを取れる。
  • 遅延イベント: ウォーターマーク後に到着する非常に遅いイベントは「デッドレタキュー」に送ってオフライン監査するか、UDAF や下流システムが修正をサポートしていればリトラクションで処理可能。
  • 手数料と資金調達: ファンディングレートや手数料などの追加コストは別ストリームとしてモデリングし、USD に換算して UDAF 内で PnL 計算に組み込むことが可能。
  • ポジションのクローズ: ポジション数量がゼロに戻った場合、浮動小数点誤差の蓄積を防ぐために avg_cost を UDAF 内でゼロにリセットする必要がある。
  • FIFO/LIFO: 厳密な FIFO/LIFO 会計がビジネス要件である場合、UDAF ロジックは取得価格と数量の順序付きキューを維持する必要があり、移動平均コスト法よりもはるかに複雑になる。
  • リプレイ可能性: 上流データソース(Kafka, CDC ログなど)が十分なデータ保持を行っていることを確認する。これにより、スキーマ変更やロジック更新時に PnL 状態をゼロから再構築でき、完全な監査可能性が担保される。

エンドツーエンド MVP(サマリー SQL)

以下の SQL をコピーし、トピックのスキーマにフィールド名を合わせれば、最小限の実用的なプロダクトを素早く立ち上げられます。(この例では trades.price がすでに USD であると仮定)

-- 1) ソース
CREATE SOURCE trades_src (..., WATERMARK FOR ts AS ts - INTERVAL '5 seconds');
CREATE SOURCE prices_src (..., WATERMARK FOR ts AS ts - INTERVAL '2 seconds');

-- 2) 最新価格
CREATE MATERIALIZED VIEW latest_price AS
SELECT symbol, MAX_BY(px_usd, ts) AS px_usd, MAX(ts) AS ts
FROM prices_src GROUP BY symbol;

-- 3) 取引の拡張(手数料を USD に換算)
CREATE MATERIALIZED VIEW trades_enriched AS
SELECT t.*, t.fee * COALESCE(p.px_usd, 1.0) AS fee_usd
FROM trades_src t
LEFT JOIN prices_src p
  ON p.symbol = t.fee_ccy
 AND p.ts BETWEEN t.ts - INTERVAL '3 seconds' AND t.ts + INTERVAL '3 seconds';

-- 4) UDAF: pnl_avg_cost(...) RETURNS JSON(エンジニアリングで実装する想定)
CREATE MATERIALIZED VIEW account_symbol_pnl_state AS
SELECT account_id, symbol,
       pnl_avg_cost(price, qty, side, fee_usd) AS state_json
FROM trades_enriched
GROUP BY account_id, symbol;

CREATE MATERIALIZED VIEW account_symbol_pnl AS
SELECT account_id, symbol,
  CAST(state_json->>'pos_qty'       AS DOUBLE PRECISION) AS pos_qty,
  CAST(state_json->>'avg_cost'      AS DOUBLE PRECISION) AS avg_cost,
  CAST(state_json->>'realized_pnl'  AS DOUBLE PRECISION) AS realized_pnl_usd,
  CAST(state_json->>'fee_cum'       AS DOUBLE PRECISION) AS fee_usd
FROM account_symbol_pnl_state;

-- 5) リアルタイム PnL
CREATE MATERIALIZED VIEW account_symbol_pnl_live AS
SELECT s.account_id, s.symbol, s.pos_qty, s.avg_cost, s.realized_pnl_usd, s.fee_usd,
       lp.px_usd AS px_now,
       s.pos_qty * (lp.px_usd - s.avg_cost) AS unrealized_pnl_usd,
       s.realized_pnl_usd
       + s.pos_qty * (lp.px_usd - s.avg_cost) AS total_pnl_usd
FROM account_symbol_pnl s
LEFT JOIN latest_price lp ON lp.symbol = s.symbol;

-- 6) シンク
CREATE SINK pnl_live_sink
FROM account_symbol_pnl_live
WITH (
    connector='kafka',
    topic='pnl_live',
    properties.bootstrap.server='kafka:9092'
) FORMAT PLAIN ENCODE JSON;

まとめと次のステップ

本ガイドでは、移動平均コスト法、ステートフルなユーザー定義集約関数(UDAF)、マテリアライズドビューの連鎖を活用し、強力なリアルタイム PnL トラッキングエンジンを構築する方法を示しました。このアプローチは効率的でスケーラブル、かつ現代の高スループット金融アプリケーションに適しています。

ここからさらにパイプラインを拡張できます:

  • UDAF を拡張し、24h・7d・30d といった特定ウィンドウにおける実現損益を正確に計算。
  • より多くの金融データを取り込む(ファンディングレート、利息支払い、マージンコールなど)ことで包括的なリスク管理システムを構築。
  • 結果を配信し、低レイテンシの WebSocket API を介してフロントエンドアプリに提供したり、ClickHouse のようなデータウェアハウスにシンクして履歴分析やバックテストに利用。

pnl_avg_cost UDAF の擬似コード実装(状態遷移ロジックやリトラクション対応を含む)が必要であれば、エンジニアリングチームの立ち上げを支援するために提供可能です。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?