近年、企業は、何十年にもわたって行われてきたバッチ処理ではなく、リアルタイムでデータを処理することの価値を認識するようになりました。
しかし、資本市場は他の業界と異なり、特にパフォーマンス要件、膨大なリアルタイムデータの取り扱い、信頼性と冗長性の確保、そして規制当局によるセキュリティおよびガバナンスの監視といった点で独自の課題を抱えています。したがって、資本市場でリアルタイムデータを配信・処理するためのツールは、高いパフォーマンス、セキュリティ、信頼性、冗長性を兼ね備えている必要があります。
資本市場におけるリアルタイムデータの主なユースケース:
- Refinitiv や Bloomberg などのベンダーからの大量の市場データ配信
- 市場データのリアルタイム分析と、HLOC(高値・安値・始値・終値)、VWAP(出来高加重平均価格)、TWAP(時間加重平均価格) などの重要な指標の計算
- 取引所で執行される予定の取引注文の配信
- 取引が執行された後の取引注文の更新情報の配信・記録
- 取引の トランザクションコスト分析(TCA) を実施し、執行状況を評価
- トレーダー UI へのライブデータ(市場データ、取引注文)の配信
ニューヨーク、ロンドン、シンガポールなどの主要な取引拠点で大量のデータをリアルタイムに配信するため、企業は Solace PubSub+ Platform のような強力なツールを利用し、データセンターやクラウドプロバイダー間でデータを効率的に配信します。このデータは、RisingWave のような強力なリアルタイム分析ツールによって処理され、即時のインサイトを導き出します。
Solace の概要
Solace PubSub+ Platform は、リアルタイム企業向けの完全なイベント駆動型管理およびストリーミングプラットフォームです。このプラットフォームは、企業がハイブリッドクラウド、マルチクラウド、IoT 環境全体でイベント駆動型アーキテクチャ(EDA)を設計、展開、統合、管理できるよう支援し、より統合されたイベント駆動型システムの構築を可能にします。
Solace PubSub+ Platform は、航空業界や小売業界をはじめとするさまざまな業界に導入されていますが、資本市場の企業でも長年にわたってミッションクリティカルな市場データおよび取引注文の配信に活用されています。
トップクラスの投資銀行やヘッジファンドは、Solace の PubSub+ ブローカーを活用して、市場データ、ティックデータ、OMS(オーダーマネジメントシステム)、EMS(執行管理システム)、PNL(損益計算システム)、トレーダー UI など、さまざまなアプリケーション間でデータを配信しています。Solace は AMQP、MQTT、REST、WebSockets などのオープンプロトコルをネイティブサポートしているため、企業内のさまざまなアプリケーションがプロトコル変換を気にせずにデータの発行・消費を容易に行えます。また、バックエンドの市場データフィードハンドラーは Solace 独自のプロトコル SMF を使用して市場データを PubSub+ に発行でき、同じデータをトレーダー UI は WebSockets 経由で、PNL アプリケーションは AMQP 経由で消費できます。
RisingWave の概要
RisingWave は、PostgreSQL 互換の SQL データプラットフォームであり、最新のデータ駆動型アプリケーションのニーズに対応するよう設計されています。このプラットフォームは、イベント駆動型アプリケーションの構築、リアルタイム ETL パイプライン、継続的な分析サービス、および AI アプリケーションのフィーチャーストアを実装するために最適化されています。RisingWave は、リアルタイムイベントストリーム、データベース変更データキャプチャ(CDC)、時系列データなど、さまざまなデータソースから最新かつ一貫したインサイトを抽出することに優れています。
高性能アーキテクチャにより、RisingWave はサブ秒レベルでのインサイト抽出を実現し、速度が重要となるユースケースに最適です。さらに、ストリーミング処理とバッチ処理を統合することで、ライブデータと履歴データの両方をシームレスに取り込み、結合、分析することが可能です。また、クラウドスケールでの運用を容易にする設計が施されており、高い信頼性とスケーラビリティを確保しています。
資本市場における RisingWave の活用
RisingWave は資本市場に革命をもたらし、ミリ秒単位の精度でリアルタイムのポートフォリオ監視とリスク管理を実現します。これにより、金融機関はポートフォリオのエクスポージャーを動的に評価し、規制上の制限に対応し、急速に変化する取引環境でシステミックリスクを軽減できます。RisingWave は、ライブの取引フロー、市場価格、履歴データをシームレスに統合し、ポートフォリオマネージャーが市場の変化を先取りできるよう支援します。
RisingWave の マテリアライズドビュー(MV) は、エクスポージャーメトリクスやリスク閾値を自動的に更新し、タイムリーなアラートを提供するとともに、自動調整を可能にします。これにより、コンプライアンスと運用効率が大幅に向上します。
さらに、RisingWave は、現代の取引システムにおける膨大なデータ量の処理において、比類ないスケーラビリティと信頼性を発揮します。Kafka や Solace などのデータストリーミングプラットフォームとネイティブに統合されており、スライディングウィンドウ分析を活用することで、ポートフォリオの評価や取引メトリクスの追跡といった複雑なワークフローを簡素化します。SQL ベースのクエリを用いたリアルタイムデータ処理により、エラープローンなバッチパイプラインの必要性を排除し、金融専門家が重要な場面でミリ秒単位の意思決定を行えるよう支援します。
技術スタック
このブログ記事では、Solace PubSub+ Platform と RisingWave の両方を活用して、リアルタイム市場データを取り込み、分析する方法をデモンストレーションします。
Solace のセットアップ
Solace PubSub+ イベントブローカーを利用するには、以下の 2 つのオプションがあります。
- 無料のソフトウェア版を Docker でセットアップする
- Solace PubSub+ Cloud を利用する
手軽に試せる PubSub+ Cloud の無料トライアル を利用することを推奨します。
データの生成
ここでは、Solace の Himanshu Gupta 氏が開発した*市場データシミュレーター* を使用します。このシミュレーターは、市場データフィードハンドラーを模倣し、L1(レベル 1)の気配値および取引データを Solace PubSub+ ブローカー に発行します。実際の環境では、L1 の気配値データと取引データは別々のフィードとして提供されますが、シミュレーションの目的上、統合された形で提供されます。
これらのフィードは、以下のような形で提供されることが一般的です。
- NYSE や NASDAQ などの取引所からの直接フィード
- Refinitiv、Bloomberg、ICE、FactSet などの市場データベンダーからの統合フィード
シミュレーターのコードおよび実行方法については、GitHub を参照してください。
Solace トピックへのデータ発行
シミュレーターは、JSON 形式のサンプルレコードを Solace PubSub+ の階層型ダイナミックトピックに発行します。トピックのフォーマットは次のようになっています。
<資産クラス>/MARKETDATA/v1/<国>/<取引所>/<ティッカー>
例えば、Apple(AAPL)の場合、次のようなトピックにデータが発行されます。
EQ/MARKETDATA/v1/US/NASDAQ/AAPL
発行されるサンプルデータは以下のようになります。
{
"symbol":"AAPL",
"askPrice":250.3121,
"bidSize":630,
"tradeSize":180,
"exchange":"NASDAQ",
"currency":"USD",
"tradePrice":249.9996,
"askSize":140,
"bidPrice":249.6871,
"date":"2020-03-23",
"time":"09:32:10.610764-04:00"
}
また、異なる銘柄のデータは次のように発行されます。
EQ/MARKETDATA/v1/UK/LSE/VOD
このようなトピック階層を使用することで、Solace PubSub+ の強力な ワイルドカードサポート および 高度なフィルタリング機能 を活用できます。
例えば、Kafka のようなログベースのストリーミングツールでは フラットなトピック構造 を採用するのが一般的ですが、Solace は 階層型トピック を利用することで、より柔軟なフィルタリングを可能にしています。この階層構造を活かすことで、コンシューマー(データの受信者)は、必要なデータストリームのみを取得できるようになります。
例えば、以下のようにトピックのサブスクリプションを設定できます。
-
EQ/>
→ 全ての株式データを購読 -
EQ/*/*/NYSE/>
→ NYSE の全ての銘柄を購読 -
EQ/MARKETDATA/v1/US/>
→ 米国株の市場データを購読
Solace のワイルドカードの詳細については、こちら を参照してください。
この仕組みは特に資本市場で有効です。例えば、市場データは PNL、Tick Data、TCA、OMS/EMS、リスク管理、トレーダー UI など、複数のシステムで消費される必要があります。しかし、もしすべてのアプリケーションに対して 未加工データ をブロードキャストしてしまうと、各アプリケーションが独自にフィルタリングを実装する必要があり、非効率的です。
Solace のブローカーがフィルタリングを サーバーサイドで実行 することで、個々のアプリケーションの負担を軽減し、より効率的なデータ配信を実現します。特に ハイブリッドクラウド や マルチクラウド環境 では、クラウドへの データ転送コスト や 計算コスト が膨らみがちですが、Solace の トピックフィルタリング機能 を活用すれば、必要なデータのみをクラウドに送信し、コストを削減できます。
さらに、Solace では 複数のトピックにまたがるデータの順序保証 が可能です。他のブローカーでは通常、データの順序は 単一のトピックパーティション 内でのみ保証されますが、これは資本市場のユースケースには不向きです。Solace の階層型トピックは、このような制約を克服し、より柔軟なデータ配信を可能にします。
データの取り込みプロセス
ここでは、RisingWave を使用して PubSub+ ブローカー から発行されたイベントを取り込み、分析します。この例では、RisingWave Cloud を利用し、リアルタイムな資本市場データ分析のためのクラスタをセットアップします。RisingWave Cloud は、使いやすいインターフェースとシンプルな運用管理を提供し、データ分析プロセスをスムーズにします。
このユースケースのために RisingWave Cloud にクラスタをセットアップする方法については、こちらのドキュメント を参照してください。
ソーステーブルの作成
RisingWave クラスタをデプロイしたら、ワークスペース に移動し、次の SQL クエリを使用してソーステーブルを作成します。
このクエリでは、RisingWave に market_data
というテーブルを作成し、PubSub+ ブローカー からリアルタイム市場データを取り込むよう設定します。このテーブルは、以下の条件で MQTT ブローカーと接続します。
-
EQ/MARKETDATA/v1/#
というトピックに接続 - 指定されたブローカー URL に接続
-
demo
ユーザー名とパスワードで認証 - at_least_once(少なくとも 1 回の配送保証)を QoS として設定
- JSON 形式 のデータを取り込み
CREATE TABLE market_data (
date DATE,
symbol VARCHAR,
askPrice NUMERIC,
bidSize INT,
tradeSize INT,
exchange VARCHAR,
currency VARCHAR,
time TIMESTAMP,
tradePrice NUMERIC,
askSize INT,
bidPrice NUMERIC
)
WITH (
connector = 'mqtt',
topic = 'EQ/MARKETDATA/v1/#',
url = 'tcp://mr-connection-xrkpmgiz510.messaging.solace.cloud:1883',
username='demo',
password='demo',
qos = 'at_least_once'
) FORMAT PLAIN ENCODE JSON;
マテリアライズドビュー(MV):概要とメリット
マテリアライズドビュー(MV) は、複雑なクエリの事前計算結果を保持し、クエリのパフォーマンスを向上させ、計算負荷を軽減するための仕組みです。通常のビューとは異なり、マテリアライズドビュー はクエリの実行ごとにデータを再計算するのではなく、静的なスナップショット を保持し、増分的に更新されます。
この特性により、マテリアライズドビューは次のようなユースケースに最適です。
- 更新頻度が少ないが、クエリ負荷が高いデータセット
- 計算コストが高い複雑なクエリ
- 頻繁にアクセスされるデータ
マテリアライズドビューを使用することで、クエリのパフォーマンスを向上させ、計算コストを削減し、リアルタイムデータ分析の効率を大幅に向上させることができます。
マテリアライズドビューの活用と RisingWave のイノベーション
マテリアライズドビューは、多くのデータベースでサポートされていますが、実装の詳細はシステムによって異なります。例えば、PostgreSQL、Redshift、Oracle では、手動または自動更新のオプションが提供されています。一方、RisingWave は、ストリーミングワークロード向けに最適化された リアルタイムのインクリメンタル更新 を特徴としています。
RisingWave のマテリアライズドビューの主な特徴は以下のとおりです。
- リアルタイム更新:最新データを即座に反映
- 高可用性と整合性:データの一貫性を維持
- 並行処理の最適化:複数のクエリを高速処理
- 時間ウィンドウとウォーターマークのサポート:高度なストリーム処理を実現
- リソース分離:ストリーム処理の負荷を適切に管理
これらの機能により、RisingWave はストリーミングデータベースのパフォーマンスを最適化し、現代のデータパイプラインにおいて不可欠な要素 となっています。
リアルタイムデータ分析のためのマテリアライズドビューの作成
次の SQL クエリは、market_data テーブルを基に enriched_market_data という マテリアライズドビュー を作成します。このビューでは、date
と time
を組み合わせて event_timestamp
を生成し、主要な市場データを整理します。
CREATE MATERIALIZED VIEW enriched_market_data AS
SELECT
("date" || ' ' || TO_CHAR("time", 'HH24:MI:SS.MS' ))::timestamptz AS event_timestamp,
symbol AS symbol,
askPrice AS ask_price,
bidPrice AS bid_price,
askSize AS ask_size,
bidSize AS bid_size,
tradePrice AS trade_price,
tradeSize AS trade_size,
exchange AS exchange,
currency AS currency
FROM
market_data;
このマテリアライズドビューをクエリすると、最新のデータを取得できます。
SELECT * FROM enriched_market_data LIMIT 5;
このクエリの結果は、以下のようになります。
event_timestamp | symbol | ask_price | bid_price | ask_size | bid_size | trade_price | trade_size | exchange | currency
---------------------+--------+------------+------------+----------+----------+-------------+------------+----------+---------
2024-11-22T08:42:56Z| FB | 94.50678 | 92.17329 | 780 | 40 | 93.340034 | 470 | NASDAQ | USD
2024-11-22T08:42:56Z| INTC | 10.377003 | 10.351093 | 630 | 50 | 10.364048 | 10 | NASDAQ | USD
2024-11-22T08:42:56Z| UOB | 97.710526 | 96.25578 | 410 | 520 | 96.983154 | 40 | SGX | SGD
2024-11-22T08:42:56Z| BARC | 1746.387 | 1703.2664 | 770 | 50 | 1724.8267 | 70 | LSE | GBP
2024-11-22T08:42:56Z| XOM | 36.714695 | 35.987675 | 770 | 310 | 36.351185 | 390 | NYSE | USD
(5 rows)
1. 分単位の市場統計集計
このマテリアライズドビュー minutely_market_stats_mv
は、1 分間のウィンドウ内で各銘柄の主要な取引指標を計算します。具体的には、以下の項目を算出します。
- 価格レンジ(最大値 - 最小値)
- 平均価格(avg)
- 価格の変動性(標準偏差)
- 出来高加重平均価格(VWAP)
- 最高値(high)
- 最安値(low)
CREATE MATERIALIZED VIEW minutely_market_stats_mv AS
SELECT
symbol,
MAX(trade_price) - MIN(trade_price) AS price_range,
AVG(trade_price) AS avg_price,
STDDEV_POP(trade_price) AS std_price,
SUM(trade_price * trade_size) / SUM(trade_size) AS vwap,
MAX(trade_price) AS high_price,
MIN(trade_price) AS low_price,
window_start,
window_end
FROM TUMBLE (market_data_enhanced, event_timestamp, INTERVAL '1 MINUTES')
GROUP BY window_start, window_end, symbol;
このビューをクエリすると、以下のような結果が得られます。
symbol | price_range | avg_price | std_price | vwap | high_price | low_price | window_start | window_end
-------+-------------+-----------+-----------+----------+------------+-----------+---------------------+---------------------
AAPL | 82.7294 | 321.8184 | 16.2368 | 322.1932 | 351.6677 | 268.9384 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
(5 rows)
2. 異常に大きな取引の検出
このマテリアライズドビュー outlier_detection_mv
は、異常値をリアルタイムで検出 するために設計されています。取引サイズ trade_size
が、過去 5 分間の平均値 + 2 標準偏差を超える場合に異常値としてフラグを立てます。
CREATE MATERIALIZED VIEW outlier_detection_mv AS
WITH outlier_detection_data AS (
SELECT
symbol,
trade_size,
AVG(trade_size) OVER (PARTITION BY symbol ORDER BY event_timestamp RANGE BETWEEN INTERVAL '5' MINUTE PRECEDING AND CURRENT ROW) AS avg_trade_size,
STDDEV_POP(trade_size) OVER (PARTITION BY symbol ORDER BY event_timestamp RANGE BETWEEN INTERVAL '5' MINUTE PRECEDING AND CURRENT ROW) AS stddev_trade_size,
window_start,
window_end
FROM TUMBLE(enriched_market_data, event_timestamp, INTERVAL '1' MINUTE)
)
SELECT *
FROM outlier_detection_data
WHERE trade_size > avg_trade_size + 2 * stddev_trade_size;
結果例:
symbol | trade_size | avg_trade_size | stddev_trade_size | window_start | window_end
-------+------------+----------------+-------------------+---------------------+---------------------
AAPL | 600 | 312.7890 | 212.3456 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
(5 rows)
3. VWAP に基づく高パフォーマンス銘柄の特定
このマテリアライズドビュー vwap_minutely_mv
は、出来高加重平均価格(VWAP) を計算し、各銘柄の 1 分間ごとのパフォーマンスをランク付けします。
CREATE MATERIALIZED VIEW vwap_minutely_mv AS
SELECT
symbol,
SUM(trade_price * trade_size) / SUM(trade_size) AS vwap,
window_start,
window_end
FROM TUMBLE (enriched_market_data, event_timestamp, INTERVAL '1 MINUTES')
GROUP BY window_start, window_end, symbol;
結果例:
symbol | vwap | window_start | window_end
-------+-----------+---------------------+---------------------
AAPL | 322.1932 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
(5 rows)
4. 取引所単位の統計集計
このマテリアライズドビュー exchange_stats_minutely_mv
は、取引所ごとの価格統計を 1 分単位で集計 します。
CREATE MATERIALIZED VIEW exchange_stats_minutely_mv AS
SELECT
exchange,
MAX(trade_price) AS high_price,
MIN(trade_price) AS low_price,
AVG(trade_price) AS avg_price,
SUM(trade_size) AS total_volume,
window_start,
window_end
FROM TUMBLE (enriched_market_data, event_timestamp, INTERVAL '1 MINUTES')
GROUP BY window_start, window_end, exchange;
結果例:
exchange | high_price | low_price | avg_price | total_volume | window_start | window_end
---------+------------+-----------+------------+--------------+---------------------+---------------------
NASDAQ | 351.6677 | 9.4449 | 140.6360 | 239870 | 2024-11-22T08:42:00Z | 2024-11-22T08:43:00Z
(5 rows)
RisingWave から Solace へデータを送信するためのシンクの作成
RisingWave では、外部システムにデータを送信するための シンク(sink) を作成できます。以下のように、シンクを定義することで、Solace へのデータ配信を行えます。
CREATE SINK market_stats_mqtt_sink
FROM minutely_market_stats_mv
WITH (
connector = 'MQTT',
topic = 'market_stats/symbol',
url = 'tcp://mr-connection-xrkpmgiz510.messaging.solace.cloud:1883',
username ='demo',
password ='demo',
qos = 'at_least_once'
) FORMAT PLAIN ENCODE JSON;
この方法を利用すれば、リアルタイムで処理したデータを Solace のトピックに直接ストリーミング できます。
結論
本記事では、資本市場におけるリアルタイムデータ配信のユースケース を紹介し、Solace PubSub+ を活用した 効率的なデータ配信 と RisingWave を用いた リアルタイム分析 の方法について説明しました。
- Solace は、多様なオープンプロトコルと API をサポートし、階層型トピックを活用した動的なルーティングとフィルタリング、堅牢なアーキテクチャ、そして既存の大規模な投資銀行・ヘッジファンドでの導入実績を持つ、資本市場におけるリアルタイムデータ配信の理想的な選択肢です。
- RisingWave は、様々なデータソースとシンクをサポートし、ミリ秒単位の精度でのデータ処理とリアルタイム分析を可能にする強力なプラットフォームです。
本記事を通じて、Solace PubSub+ Platform と RisingWave を組み合わせることで、資本市場におけるリアルタイムデータ分析がどれほど強力になるかを示しました。