概要
今回は、IoTのをユースケースを、ksqlDBを利用して試してみました。
IoTのユースケースを考えてみるきっかけについて
近年、IoT、AIのユースケースも珍しくなくなってきました。ところが先日、あるお客様からIoTシステムは便利だけどIoTデバイスを過度に信用しすぎると危険だという話を聞きました。よく伺ってみるとそのお客様の現場にあるメタン検知用ガスセンサ搭載のIoTデバイスが不調(完全に壊れていない状態)で、そのデバイスから収集したデータをそのまま信じてAI/MLで処理すると想定外のトラブルが発生する可能性があるということでした。
これを聞いてksqlDBを使ってIoTデータからデバイスがオフラインになっているか、またはいつもと違う状態になっていることを自動で検知できればこうした課題に少しは役立つかなと思い簡単なケースを想定して試してみました。
IoTのユースケースについて
IoTといえば例えば車両や製造設備またはスマート家電などにも広がっています。IoTデバイスによるデータの多くは特定の時間に生成、処理され、そのデータ量と速度が重要な要素となる場合が少なくありません。こうしたケースでは、通常時と比較してデータ送信(共有)の挙動が変化した場合はデバイスのメンテナンスが必要な時期かもしれないという判断をする手助けするIoT資産追跡システムの重要な機能として実装されることもあるようです。
ここでは、シンプルにksqlDB を使用してどのデバイスのデータが取得できなくなったかを確認する方法をご紹介します。
ksqlDBを使う
ksqlDBは、クラウド内の一般的なデータソースやエンドシステムから直接データをインポートおよびエクスポートするができます。ここでは、ksqlDBのINSERT INTO機能で模擬データを使用してコードを実行します。
この例では、IoTのユースケースとしてデバイスからのデータ取得の際に異変があることを知ることで、そのデバイスの故障の予兆検知であったり、サイバー攻撃の対象となっている可能性などを判断することに役立つものをイメージしました。
手順1:STREAMを作成する
以下のksqlDB構文を実行します。
CREATE STREAM iot_telemetry(
device_id INT,
ts BIGINT
) WITH (
KAFKA_TOPIC = ‘iot_telemetry‘,
VALUE_FORMAT = 'JSON‘,
PARTITIONS = 6,
TIMESTAMP = 'ts’
);
手順2:ビューを作成する
CREATE TABLE iot_telemetry_lags WITH (KAFKA_TOPIC = 'iot_telemetry_lags') AS
SELECT
device_id,
WINDOWEND - LATEST_BY_OFFSET(ts) as lag_ms,
TIMESTAMPTOSTRING(WINDOWSTART, ‘yyyy-MM-dd HH:mm:ss’) as window_start,
TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss') as window_end
FROM iot_telemetry
WINDOW TUMBLING (SIZE 120 SECONDS)
GROUP BY device_id;
手順3:模擬データを投入する
ここでは、ksqlDBを使って模擬データを投入します。
INSERT INTO iot_telemetry (device_id, ts) VALUES (1, 1655144403000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144403000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144423000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144443000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144463000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144483000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144503000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144523000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144543000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144563000);
INSERT INTO iot_telemetry (device_id, ts) VALUES (0, 1655144583000);
手順4:確認する
ここでは、模擬データが正しく投入されたか以下のクエリを実行して確認します。
模擬データが正しく投入されていれば、実行結果は以下のとおりとなります。
この例では、一定時間IoTデバイスからテレメトリ―データが送信されていない状況をウォッチして、もしその状況(今回の例の場合はlag_msの値が60000を超えた時)になったら該当するデータを出力するというものでしたが、非常にシンプルなSQLの構文でIoTデバイスの状況をチェックできることがわかりました。
今回も最後までお読み頂きましてありがとうございまいた。