課題:リアルタイム物流における正確な ETA の維持
現代の物流およびサプライチェーンシステムにおいて、出荷状況のリアルタイム可視化はもはや「あれば便利」な機能ではなく、業務の中核を成す要件です。配送車両は常時 GPS 位置情報をストリーミングし、交通状況は一日の中で絶えず変化し、すべての注文には必ず守るべき配送期限が設定されています。課題は単に荷物が今どこにあるかを把握することではありません。より難しい問い――「予定どおり到着するのか」「遅延リスクをどれだけ早く検知できるのか」――に継続的に答え続けることです。
従来のアプローチには大きく二つの問題があります。まず、バッチパイプラインでは避けられないレイテンシが発生します。ETA は数分後に計算され、その間に車両はすでに移動し、交通状況も変化しています。一方で、ストリーミングシステムをゼロから構築するには、複雑な状態管理や複数のライブデータソース間の高度な結合処理、さらに大きな運用コストが伴います。その結果、重厚なストリーミング基盤を採用せずにリアルタイム精度を実現したいチームとの間にギャップが生じます。
本デモは、まさにそのギャップを埋めるものです。複雑なストリーム処理コードを書くことなく、継続的に変化する物流ストリームを、実用的なリアルタイム ETA および遅延インサイトへと変換します。
ソリューション:RisingWave による SQL ネイティブなリアルタイム物流パイプライン
本デモでは、データ取り込みに Kafka、継続的な処理および結果提供に RisingWave を組み合わせたエンドツーエンドの構成を構築しました。RisingWave はパイプラインの中心に位置し、複数のストリームを取り込み、マテリアライズドビューを通じて常に最新の状態を維持し、その結果を軽量なバックエンドに公開してダッシュボードを支えます。
設計の基本原則はシンプルです。現実世界の各シグナルをストリームとして扱い、それらをクエリ可能なテーブルへ変換し、RisingWave によって各出荷に関する「現在の真実」――最新位置、現在の交通状況、予測到着時刻、遅延ステータス――を継続的に維持させます。
仕組み:データパイプラインを段階的に解説
全体アーキテクチャ:
ステップ 1:Kafka から 3 つのリアルタイムストリームを取り込む
まず、3 つの Kafka トピックを RisingWave にストリームテーブルとして取り込みます。
gps_stream は車両から送信される生の GPS 更新データを表します。各メッセージにはタイムスタンプ、車両 ID、座標、瞬間速度が含まれます。
-- 1) GPS stream (JSON)
CREATE TABLE sc.gps_raw (
event_ts TIMESTAMPTZ,
vehicle_id VARCHAR,
lat DOUBLE PRECISION,
lon DOUBLE PRECISION,
speed_kmh DOUBLE PRECISION
) WITH (
connector = 'kafka',
topic = 'gps_stream',
properties.bootstrap.server = '127.0.0.1:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
traffic_stream は交通状況のスナップショットを表します。各イベントは region_id をキーとし、バウンディングボックス、混雑度、走行速度を含みます。
-- 2) Traffic stream (JSON)
CREATE TABLE sc.traffic_raw (
event_ts TIMESTAMPTZ,
region_id VARCHAR,
min_lat DOUBLE PRECISION,
min_lon DOUBLE PRECISION,
max_lat DOUBLE PRECISION,
max_lon DOUBLE PRECISION,
congestion DOUBLE PRECISION,
traffic_speed_kmh DOUBLE PRECISION
) WITH (
connector = 'kafka',
topic = 'traffic_stream',
properties.bootstrap.server = '127.0.0.1:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
order_stream は注文に関する運用更新データです。配送先、約束された配送時刻、現在のステータス、更新時刻が含まれます。このストリームによって、車両の移動が「出荷トラッキング」という問題へと結び付けられます。各注文は SLA と配送先に紐づけられます。
-- 3) Order updates stream (JSON)
CREATE TABLE sc.order_updates_raw (
order_id VARCHAR,
vehicle_id VARCHAR,
dest_lat DOUBLE PRECISION,
dest_lon DOUBLE PRECISION,
promised_ts TIMESTAMPTZ,
status VARCHAR,
updated_at TIMESTAMPTZ
) WITH (
connector = 'kafka',
topic = 'order_stream',
properties.bootstrap.server = '127.0.0.1:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
ステップ 2:型付きイベントと共通空間キーへの正規化
生ストリームはすでに構造化されていますが、パイプラインではさらに型付きイベントビューへ正規化し、重要なフィールド grid_id を追加します。
GPS イベントでは、緯度・経度を 0.002 度単位のグリッドにスナップし、そのバケット化された値を文字列連結して grid_id を生成します。交通イベントでは、バウンディングボックスの中心点を同じルールでスナップして grid_id を計算します。これにより、高コストな空間条件ではなく、単純な等価キーで GPS と交通データを結合できます。
-- GPS events (typed)
CREATE MATERIALIZED VIEW sc.gps_events AS
SELECT
event_ts,
vehicle_id,
lat,
lon,
speed_kmh,
(
CAST(FLOOR(lat / 0.002) AS BIGINT)::TEXT
|| '_' ||
CAST(FLOOR(lon / 0.002) AS BIGINT)::TEXT
) AS grid_id
FROM sc.gps_raw;
-- Traffic events (typed)
CREATE MATERIALIZED VIEW sc.traffic_events AS
SELECT
event_ts,
region_id,
min_lat,
min_lon,
max_lat,
max_lon,
congestion,
traffic_speed_kmh,
(
CAST(FLOOR(((min_lat + max_lat) / 2) / 0.002) AS BIGINT)::TEXT
|| '_' ||
CAST(FLOOR(((min_lon + max_lon) / 2) / 0.002) AS BIGINT)::TEXT
) AS grid_id
FROM sc.traffic_raw;
ステップ 3:イベントストリームを最新状態スナップショットへ変換
リアルタイムダッシュボードや ETA 計算に必要なのは、追記型の履歴ではなく「現在の状態」です。そのため、時系列順に row_number() を付与し、各キーにつき 1 行のみ保持するスナップショットを作成します。
-- Latest order snapshot (one row per order_id), e.g., order snapshot
CREATE MATERIALIZED VIEW sc.orders AS
SELECT
order_id,
vehicle_id,
dest_lat,
dest_lon,
promised_ts,
status,
updated_at
FROM (
SELECT *, row_number() OVER (PARTITION BY order_id ORDER BY updated_at DESC) AS rn
FROM sc.order_updates
) t
WHERE rn = 1;
ここで重要なのが RisingWave の特長です。マテリアライズドビューはこれらの「最新」スナップショットを継続的かつ増分的に維持します。Kafka に新しいメッセージが到着すると、影響を受けるキーのみが更新され、下流の結合処理に常に最新の状態が提供されます。
ステップ 4:注文・車両・交通を 1 行のライブ運用データに結合
最新状態テーブルが揃ったら、vehicle_id と grid_id を使って 3 つのテーブルを結合します。交通データが存在しない場合でも ETA パイプラインが安定して動作するよう、2 つのデフォルト値を定義しています。congestion は 0.0 にフォールバックし、traffic_speed_kmh は車両自身の速度にフォールバックします。
WITH joined AS (
SELECT
o.order_id,
o.vehicle_id,
o.status,
o.dest_lat,
o.dest_lon,
o.promised_ts,
o.updated_at,
v.lat AS vehicle_lat,
v.lon AS vehicle_lon,
v.speed_kmh AS vehicle_speed_kmh,
v.gps_ts,
v.grid_id,
t.region_id,
coalesce(t.congestion, 0.0) AS congestion,
coalesce(t.traffic_speed_kmh, v.speed_kmh) AS traffic_speed_kmh
FROM sc.orders o
JOIN sc.vehicle_latest v ON o.vehicle_id = v.vehicle_id
LEFT JOIN sc.traffic_latest t ON v.grid_id = t.grid_id
)
ステップ 5:距離・実効速度・ETA・到着予測時刻・遅延を計算
calc_dist では、車両の現在位置と配送先との間のハーサイン距離(km)を計算し、distance_km を算出します。
calc_dist AS (
SELECT
*,
-- Haversine distance (km)
(2 * 6371 * asin(
sqrt(
power(sin(radians(dest_lat - vehicle_lat)/2), 2) +
cos(radians(vehicle_lat)) * cos(radians(dest_lat)) *
power(sin(radians(dest_lon - vehicle_lon)/2), 2)
)
)) AS distance_km
FROM joined
)
calc_speed では、生の速度シグナルから実効速度を算出します。車両速度と交通速度の小さい方を取り、混雑ペナルティ $1 - congestion \times 0.6$ を適用し、極端に低速な場合でも ETA が不安定にならないよう下限を 5.0 km/h に設定します。これにより effective_speed_kmh を得ます。
calc_speed AS (
SELECT
*,
-- Effective speed (km/h), floor at 5
greatest(
5.0,
least(vehicle_speed_kmh, traffic_speed_kmh) * (1 - congestion * 0.6)
) AS effective_speed_kmh
FROM calc_dist
)
calc_eta では、$\frac{distance_km}{effective_speed_kmh} \times 60.0$ により移動時間(分)を計算し、eta_minutes を出力します。
calc_eta AS (
SELECT
*,
-- Travel time (minutes)
(distance_km / effective_speed_kmh * 60.0) AS eta_minutes
FROM calc_speed
)
最終的な SELECT では、ビジネス上重要な 2 つの時刻を算出します。予測到着時刻 eta_ts は $gps_ts + eta_minutes \times 1\ minute$ で計算され、最新の GPS イベント時刻に基づいています。遅延は $delay_interval = eta_ts - promised_ts$ として計算され、正の値であれば約束時刻を超過する見込みであることを意味します。
SELECT
order_id,
vehicle_id,
status,
dest_lat, dest_lon,
promised_ts,
updated_at,
vehicle_lat, vehicle_lon,
vehicle_speed_kmh,
gps_ts,
grid_id,
region_id,
congestion,
traffic_speed_kmh,
distance_km,
effective_speed_kmh,
eta_minutes,
-- ETA timestamp based on event time
(gps_ts + (eta_minutes * INTERVAL '1 minute')) AS eta_ts,
-- Delay vs promised time (interval)
((gps_ts + (eta_minutes * INTERVAL '1 minute')) - promised_ts) AS delay_interval
FROM calc_eta;
この一連の処理全体がマテリアライズドビューとして維持されます。新しい GPS メッセージ、交通状況の変化、注文更新が発生すると、RisingWave は影響を受ける注文のみを増分更新し、ダッシュボードから常に即座に参照できる状態を保ちます。
ダッシュボードの内部:リアルタイム出荷モニタリング
ダッシュボードは生イベントストリームではなく、RisingWave によって継続的に更新される結果を基盤としています。軽量なバックエンドが REST エンドポイントを提供し、ライブビューをクエリして 2 種類のデータを返します。KPI ヘッダー用のスナップショット指標と、テーブルおよびマップ用の注文データです。
KPI セクションは運用状況をリアルタイムに反映します。現在配送中の注文数、約束時刻に対してすでに遅延している件数、平均残り ETA、そして最終更新時刻が表示されます。これらの数値は RisingWave 内の SQL によって計算されており、新しいデータが到着すると即座に更新されます。
マップ表示は即時の状況把握に焦点を当てています。車両位置と配送先がリアルタイムで描画され、ルートにはリスクや遅延状況が視覚的に示されます。車両にカーソルを合わせると、ETA(分)、到着予測時刻、約束時刻、遅延時間、車両速度、交通速度など、注文単位の詳細情報が表示されます。画面を切り替えることなく状況を迅速に把握できます。
注文リストはマップを補完し、各アクティブ配送の現在の状態を表す行を、並び替えやフィルタリング可能な形式で提供します。
任意の注文をクリックすると詳細パネルが開き、予測到着時刻と約束配送時刻が並べて表示されるため、遅延の有無が明確かつ説明可能になります。
数分で自分でも構築できます
実際に動かしてみませんか?本デモはすべてオープンソースで公開されており、1 時間以内に環境構築できる手順が用意されています。
- RisingWave の SQL スクリプト(ソーステーブルおよびマテリアライズドビュー)
- モックデータ生成用の JavaScript スクリプト
- ビジュアルダッシュボードのフロントエンドおよびバックエンドコード
- README ドキュメント
RisingWave を始めましょう
-
今すぐ RisingWave を試す:
- 自社インフラにデプロイできる オープンソース版 RisingWave をダウンロード
- フルマネージド環境で素早く開始できる RisingWave Cloud
-
エキスパートに相談: 複雑なユースケースや個別デモをご希望の場合は、お問い合わせ ください。
-
コミュニティに参加: 開発者同士で交流し、質問や知見を共有できる Slack コミュニティ にぜひご参加ください。
個別デモのご希望や、具体的なユースケースへの適用についてのご相談は、ぜひ 営業チームまでお問い合わせください。






