TL;DR
本記事では、RisingWave の ASOF Join を使って、不定期に到着するセンサー測定値を、直近のデバイスステータス更新と紐付ける方法を紹介します。
Inner ASOF Join と Left ASOF Join を実際の SQL 例とともに解説し、タイムスタンプがぴったり一致しない IoT や時系列データで特に役立つアプローチを説明します。
複雑な時系列結合ロジックをシンプルにできるのがポイントです。
Introduction
ストリーム処理では、異なるデータソースからのデータを結合することはよくあります。通常は、片方のテーブルの User ID と、もう片方のテーブルの User ID のように、完全一致するキーで結合します。
しかし、時系列データでは状況が異なります。多くの場合、タイムスタンプが完全一致しません。
例えば:
- センサー測定値:08:15
- システムのステータス更新:08:00
通常の JOIN は完全一致を探すため、ここでは結合できません。しかし実際には、この測定値に対しては 直前のステータス(08:00)を紐付けたい わけです。
そこで登場するのが ASOF Join です。
ASOF Join は、近似条件に基づいて行を結合します。左側の行に対して、条件を満たす右側の「最も最近の」行を探します。
金融データ(株価など)や IoT データ(センサー測定値など)に最適です。
本チュートリアルでは、RisingWave を使ってセンサー測定値とデバイスステータス更新を紐付ける方法を説明します。
ASOF Join を使ってセンサー測定値とデバイスステータス更新を紐付ける方法
1. テーブルとデータの準備
まず RisingWave を起動しておきましょう。まだの場合は、インストールガイドを参考にしてください。
今回扱うデータは 2 種類です:
-
device_status:デバイスの状態(normal / warning / error)。1 時間ごとなど定期的に更新される。 -
sensor_readings:温度の測定値。不規則なタイミングで発生する(例:毎時に限らずランダムな分)。
まず、ステータスデータを格納するテーブルとサンプルデータを作成します:
-- デバイスステータス更新用のテーブル(1 時間ごとに更新される想定)
CREATE TABLE device_status (
device_id INT,
status VARCHAR,
update_time TIMESTAMP
);
-- デバイスステータスの投入(バッチデータ)
INSERT INTO device_status (device_id, status, update_time) VALUES
(101, 'normal', '2023-01-01 08:00:00'),
(101, 'warning', '2023-01-01 09:00:00'),
(101, 'error', '2023-01-01 10:00:00'),
(102, 'normal', '2023-01-01 08:00:00'),
(102, 'normal', '2023-01-01 09:00:00'),
(103, 'normal', '2023-01-01 08:00:00'); -- 注: Device 104 のステータスは存在しない
続いてセンサー測定値テーブルを作成し、データを挿入します。
こちらのタイムスタンプ(例:08:15:00)がステータスのタイムスタンプ(08:00:00)と一致していないことに注目してください。
CREATE TABLE sensor_readings (
reading_id INT,
device_id INT,
temperature FLOAT,
reading_time TIMESTAMP
);
*-- センサー測定値の投入*
INSERT INTO sensor_readings (reading_id, device_id, temperature, reading_time) VALUES
(1, 101, 36.2, '2023-01-01 08:15:00'),
(2, 101, 38.7, '2023-01-01 09:20:00'),
(3, 101, 41.3, '2023-01-01 10:05:00'),
(4, 102, 22.5, '2023-01-01 08:45:00'),
(5, 102, 23.1, '2023-01-01 09:30:00'),
(6, 103, 18.9, '2023-01-01 11:00:00'),
(7, 104, 25.6, '2023-01-01 09:15:00');
*-- Device 104 はステータスが存在しない*
2. Inner ASOF Join を実行する
ここで知りたいのは:
「この温度測定値の直前にデバイスはどんなステータスだったのか?」
ASOF Join の ON には 2 つの条件が必要です:
-
等値条件(例:
device_id = device_id) -
不等号条件(例:
reading_time >= update_time)
まずは Inner ASOF Join を試してみます。
マッチが見つかった行だけを返し、見つからない行は除外されます。
CREATE MATERIALIZED VIEW inner_asof_join_results AS
SELECT
s.reading_id,
s.device_id,
s.temperature,
s.reading_time,
d.status,
d.update_time AS last_status_update
FROM sensor_readings s
ASOF JOIN device_status d
ON s.device_id = d.device_id
AND s.reading_time >= d.update_time;
結果を確認します:
SELECT * FROM inner_asof_join_results ORDER BY reading_id;
出力:
reading_id | device_id | temperature | reading_time | status | last_status_update
*------------+-----------+-------------+---------------------+---------+---------------------*
1 | 101 | 36.2 | 2023-01-01 08:15:00 | normal | 2023-01-01 08:00:00
2 | 101 | 38.7 | 2023-01-01 09:20:00 | warning | 2023-01-01 09:00:00
3 | 101 | 41.3 | 2023-01-01 10:05:00 | error | 2023-01-01 10:00:00
4 | 102 | 22.5 | 2023-01-01 08:45:00 | normal | 2023-01-01 08:00:00
5 | 102 | 23.1 | 2023-01-01 09:30:00 | normal | 2023-01-01 09:00:00
6 | 103 | 18.9 | 2023-01-01 11:00:00 | normal | 2023-01-01 08:00:00
ポイント:
-
08:15 の測定値 (#1) が正しく 08:00 のステータス と紐付いている。
-
Reading #7(Device 104) は結果に含まれない。
→ ステータスが存在しないため Inner Join では除外される。
3. Left Outer ASOF Join を実行する
すべてのセンサー測定値を保持したい場合もあります。
Device 104 のようにステータスがなくても、温度データは見たいケースです。
その場合は Left Outer ASOF Join(ASOF LEFT JOIN) を使います。
左側(sensor_readings)の行はすべて含まれ、対応するステータスがなければ NULL になります。
CREATE MATERIALIZED VIEW left_outer_asof_join_results AS
SELECT
s.reading_id,
s.device_id,
s.temperature,
s.reading_time,
d.status,
d.update_time AS last_status_update
FROM sensor_readings s
ASOF LEFT JOIN device_status d
ON s.device_id = d.device_id
AND s.reading_time >= d.update_time;
結果を確認します:
SELECT * FROM left_outer_asof_join_results ORDER BY reading_id;
出力:
reading_id | device_id | temperature | reading_time | status | last_status_update
*------------+-----------+-------------+---------------------+---------+---------------------*
1 | 101 | 36.2 | 2023-01-01 08:15:00 | normal | 2023-01-01 08:00:00
2 | 101 | 38.7 | 2023-01-01 09:20:00 | warning | 2023-01-01 09:00:00
3 | 101 | 41.3 | 2023-01-01 10:05:00 | error | 2023-01-01 10:00:00
4 | 102 | 22.5 | 2023-01-01 08:45:00 | normal | 2023-01-01 08:00:00
5 | 102 | 23.1 | 2023-01-01 09:30:00 | normal | 2023-01-01 09:00:00
6 | 103 | 18.9 | 2023-01-01 11:00:00 | normal | 2023-01-01 08:00:00
7 | 104 | 25.6 | 2023-01-01 09:15:00 | NULL | NULL
ここで Reading #7 も保持され、ステータスがないので NULL になっていることがわかります。
Conclusion
ASOF Join は、RisingWave で時系列データを扱う際に非常に強力な機能です。以下のような場面で役立ちます:
- 時間がズレて発生するイベントを自然に結合できる
- 特定時点に対して「直前の値」を取得できる
- Left Outer Join によって履歴がなくてもデータを保持できる
これにより、本来であれば複雑なサブクエリやカスタムコードが必要となる処理を、シンプルな SQL だけで実現できます。

