0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ASOF Join 実践編:センサー値を正しいステータスに自然に紐付ける方法

Posted at

ASOF Join 実践編:センサー値を正しいステータスに自然に紐付ける方法

TL;DR

本記事では、RisingWave の ASOF Join を使って、不定期に到着するセンサー測定値を、直近のデバイスステータス更新と紐付ける方法を紹介します。

Inner ASOF Join と Left ASOF Join を実際の SQL 例とともに解説し、タイムスタンプがぴったり一致しない IoT や時系列データで特に役立つアプローチを説明します。

複雑な時系列結合ロジックをシンプルにできるのがポイントです。

Introduction

ストリーム処理では、異なるデータソースからのデータを結合することはよくあります。通常は、片方のテーブルの User ID と、もう片方のテーブルの User ID のように、完全一致するキーで結合します。

しかし、時系列データでは状況が異なります。多くの場合、タイムスタンプが完全一致しません

例えば:

  • センサー測定値:08:15
  • システムのステータス更新:08:00

通常の JOIN は完全一致を探すため、ここでは結合できません。しかし実際には、この測定値に対しては 直前のステータス(08:00)を紐付けたい わけです。

そこで登場するのが ASOF Join です。

ASOF Join は、近似条件に基づいて行を結合します。左側の行に対して、条件を満たす右側の「最も最近の」行を探します。

金融データ(株価など)や IoT データ(センサー測定値など)に最適です。

本チュートリアルでは、RisingWave を使ってセンサー測定値とデバイスステータス更新を紐付ける方法を説明します。

ASOF Join の概念図:センサー測定値とデバイスステータス更新をどのように紐付けるかを示す図

ASOF Join を使ってセンサー測定値とデバイスステータス更新を紐付ける方法

1. テーブルとデータの準備

まず RisingWave を起動しておきましょう。まだの場合は、インストールガイドを参考にしてください。

今回扱うデータは 2 種類です:

  1. device_status:デバイスの状態(normal / warning / error)。1 時間ごとなど定期的に更新される。
  2. sensor_readings:温度の測定値。不規則なタイミングで発生する(例:毎時に限らずランダムな分)。

まず、ステータスデータを格納するテーブルとサンプルデータを作成します:

-- デバイスステータス更新用のテーブル(1 時間ごとに更新される想定)
CREATE TABLE device_status (
        device_id INT,
        status VARCHAR,
        update_time TIMESTAMP
);
-- デバイスステータスの投入(バッチデータ)
INSERT INTO device_status (device_id, status, update_time) VALUES
        (101, 'normal', '2023-01-01 08:00:00'),
        (101, 'warning', '2023-01-01 09:00:00'),
        (101, 'error',  '2023-01-01 10:00:00'),
        (102, 'normal', '2023-01-01 08:00:00'),
        (102, 'normal', '2023-01-01 09:00:00'),
        (103, 'normal', '2023-01-01 08:00:00'); -- 注: Device 104 のステータスは存在しない

続いてセンサー測定値テーブルを作成し、データを挿入します。

こちらのタイムスタンプ(例:08:15:00)がステータスのタイムスタンプ(08:00:00)と一致していないことに注目してください。

CREATE TABLE sensor_readings (
    reading_id INT,
    device_id INT,
    temperature FLOAT,
    reading_time TIMESTAMP
);

*-- センサー測定値の投入*
INSERT INTO sensor_readings (reading_id, device_id, temperature, reading_time) VALUES
    (1, 101, 36.2, '2023-01-01 08:15:00'),
    (2, 101, 38.7, '2023-01-01 09:20:00'),
    (3, 101, 41.3, '2023-01-01 10:05:00'),
    (4, 102, 22.5, '2023-01-01 08:45:00'),
    (5, 102, 23.1, '2023-01-01 09:30:00'),
    (6, 103, 18.9, '2023-01-01 11:00:00'),
    (7, 104, 25.6, '2023-01-01 09:15:00');
    *-- Device 104 はステータスが存在しない*

2. Inner ASOF Join を実行する

ここで知りたいのは:

「この温度測定値の直前にデバイスはどんなステータスだったのか?」

ASOF Join の ON には 2 つの条件が必要です:

  1. 等値条件(例:device_id = device_id
  2. 不等号条件(例:reading_time >= update_time

まずは Inner ASOF Join を試してみます。

マッチが見つかった行だけを返し、見つからない行は除外されます。

CREATE MATERIALIZED VIEW inner_asof_join_results AS
SELECT
    s.reading_id,
    s.device_id,
    s.temperature,
    s.reading_time,
    d.status,
    d.update_time AS last_status_update
FROM sensor_readings s
ASOF JOIN device_status d
    ON s.device_id = d.device_id
    AND s.reading_time >= d.update_time;

結果を確認します:

SELECT * FROM inner_asof_join_results ORDER BY reading_id;

出力:

reading_id | device_id | temperature |    reading_time     | status  | last_status_update
*------------+-----------+-------------+---------------------+---------+---------------------*
          1 |       101 |        36.2 | 2023-01-01 08:15:00 | normal  | 2023-01-01 08:00:00
          2 |       101 |        38.7 | 2023-01-01 09:20:00 | warning | 2023-01-01 09:00:00
          3 |       101 |        41.3 | 2023-01-01 10:05:00 | error   | 2023-01-01 10:00:00
          4 |       102 |        22.5 | 2023-01-01 08:45:00 | normal  | 2023-01-01 08:00:00
          5 |       102 |        23.1 | 2023-01-01 09:30:00 | normal  | 2023-01-01 09:00:00
          6 |       103 |        18.9 | 2023-01-01 11:00:00 | normal  | 2023-01-01 08:00:00

ポイント:

  1. 08:15 の測定値 (#1) が正しく 08:00 のステータス と紐付いている。

  2. Reading #7(Device 104) は結果に含まれない。

    → ステータスが存在しないため Inner Join では除外される。

3. Left Outer ASOF Join を実行する

すべてのセンサー測定値を保持したい場合もあります。

Device 104 のようにステータスがなくても、温度データは見たいケースです。

その場合は Left Outer ASOF Join(ASOF LEFT JOIN) を使います。

左側(sensor_readings)の行はすべて含まれ、対応するステータスがなければ NULL になります。

CREATE MATERIALIZED VIEW left_outer_asof_join_results AS
SELECT
    s.reading_id,
    s.device_id,
    s.temperature,
    s.reading_time,
    d.status,
    d.update_time AS last_status_update
FROM sensor_readings s
ASOF LEFT JOIN device_status d
    ON s.device_id = d.device_id
    AND s.reading_time >= d.update_time;

結果を確認します:

SELECT * FROM left_outer_asof_join_results ORDER BY reading_id;

出力:

reading_id | device_id | temperature |    reading_time     | status  | last_status_update
*------------+-----------+-------------+---------------------+---------+---------------------*
          1 |       101 |        36.2 | 2023-01-01 08:15:00 | normal  | 2023-01-01 08:00:00
          2 |       101 |        38.7 | 2023-01-01 09:20:00 | warning | 2023-01-01 09:00:00
          3 |       101 |        41.3 | 2023-01-01 10:05:00 | error   | 2023-01-01 10:00:00
          4 |       102 |        22.5 | 2023-01-01 08:45:00 | normal  | 2023-01-01 08:00:00
          5 |       102 |        23.1 | 2023-01-01 09:30:00 | normal  | 2023-01-01 09:00:00
          6 |       103 |        18.9 | 2023-01-01 11:00:00 | normal  | 2023-01-01 08:00:00
          7 |       104 |        25.6 | 2023-01-01 09:15:00 | NULL    | NULL

ここで Reading #7 も保持され、ステータスがないので NULL になっていることがわかります。

Conclusion

ASOF Join は、RisingWave で時系列データを扱う際に非常に強力な機能です。以下のような場面で役立ちます:

  • 時間がズレて発生するイベントを自然に結合できる
  • 特定時点に対して「直前の値」を取得できる
  • Left Outer Join によって履歴がなくてもデータを保持できる

これにより、本来であれば複雑なサブクエリやカスタムコードが必要となる処理を、シンプルな SQL だけで実現できます。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?