概要
Azure IoT Hub から Azure Storage にメッセージルーティンしたデバイス ライフサイクル イベント(DeviceLifecycleEventsデータソース)を Databricks で参照する方法を紹介します。
事前準備
環境構築
- Azure IoT Hub の構築 (本手順は Free tier で実施)
- Azure Storage の構築(Databricks 環境から接続できるように設定)
- Databricks (本手順は Serveless ではなく汎用コンピュートで実行)
Azure IoT Hub のマネージド ID に対して Azure Storage のストレージ BLOB データ共同作成者権限を付与
Azure IoT Hub にてカスタムエンドポイントを作成
Azure portal では作成する画面はないため、メッセージ ルーティングを作成するに同時に作成できます。
Azure IoT Hubs にてメッセージルーティンを設定
デバイス ライフサイクル イベントのメッセージルーティンの実施
Device の作成
Module の作成
Module の削除
Device の削除
Azure Storage にてファイルを確認
Databricks におけるデータの読み込み
Azure Storage からデータを取得
Azure Storage からデータを取得します。
storage_account = "iothubstorage123"
container = "qiitatest"
abfs_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/"
abfs_path += "iot-hub-test-0001/*/*/*/*/*/*"
src_df = (
spark.read
.format("avro")
.load(abfs_path)
)
src_df.printSchema()
src_df.display()
src_df.printSchema()
src_df.display()
root
|-- EnqueuedTimeUtc: string (nullable = true)
|-- Properties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- SystemProperties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- Body: binary (nullable = true)
body列に対する decode の実施
from pyspark.sql.functions import expr
df = src_df.withColumn(
"Body",
expr("decode(Body, 'UTF-8')")
)
df.display()
{
"deviceId": "qiita-test-device-01",
"etag": "AAAAAAAAAAE=",
"version": 2,
"properties": {
"desired": {
"$metadata": {
"$lastUpdated": "2025-05-27T01:40:32.8491207Z"
},
"$version": 1
},
"reported": {
"$metadata": {
"$lastUpdated": "2025-05-27T01:40:32.8491207Z"
},
"$version": 1
}
}
}
<補足>Azure Storage に連携されるデータ
デバイス作成時のデータ
Properties
operationTimestamp: "2025-05-27T01:40:32.8491207Z"
iothub-message-schema: "deviceLifecycleNotification"
opType: "createDeviceIdentity"
deviceId: "qiita-test-device-01"
hubName: "iot-hub-test-0001"
SystemProperties
twin-version: "2"
connectionDeviceId: "qiita-test-device-01"
contentType: "application/json"
contentEncoding: "utf-8"
correlationId: "2d63cf127e9"
userId: "aW90LWh1Yi10ZXN0LTAwMDE="
enqueuedTime: "2025-05-27T01:40:32.9790000Z"
body
{
"deviceId": "qiita-test-device-01",
"etag": "AAAAAAAAAAE=",
"version": 2,
"properties": {
"desired": {
"$metadata": {
"$lastUpdated": "2025-05-27T01:40:32.8491207Z"
},
"$version": 1
},
"reported": {
"$metadata": {
"$lastUpdated": "2025-05-27T01:40:32.8491207Z"
},
"$version": 1
}
}
}
デバイス削除時のデータ
Properties
operationTimestamp: "2025-05-27T01:40:53.2559728Z"
iothub-message-schema: "deviceLifecycleNotification"
opType: "deleteDeviceIdentity"
deviceId: "qiita-test-device-01"
hubName: "iot-hub-test-0001"
SystemProperties
twin-version: "2"
connectionDeviceId: "qiita-test-device-01"
contentType: "application/json"
contentEncoding: "utf-8"
correlationId: "2d649198c20"
userId: "aW90LWh1Yi10ZXN0LTAwMDE="
enqueuedTime: "2025-05-27T01:40:53.2760000Z"
body
{
"deviceId": "qiita-test-device-01",
"etag": "AAAAAAAAAAE=",
"version": 2,
"properties": {
"desired": {
"$metadata": {
"$lastUpdated": "2025-05-27T01:40:32.8491207Z"
},
"$version": 1
},
"reported": {
"$metadata": {
"$lastUpdated": "2025-05-27T01:40:32.8491207Z"
},
"$version": 1
}
}
}
モジュール作成時のデータ
Properties
moduleId: "qiita-test-module-01"
operationTimestamp: "2025-05-27T01:40:42.4587483Z"
iothub-message-schema: "moduleLifecycleNotification"
opType: "createModuleIdentity"
deviceId: "qiita-test-device-01"
hubName: "iot-hub-test-0001"
SystemProperties
twin-version: "2"
connectionModuleId: "qiita-test-module-01"
connectionDeviceId: "qiita-test-device-01"
contentType: "application/json"
contentEncoding: "utf-8"
correlationId: "2d642ae98ce"
userId: "aW90LWh1Yi10ZXN0LTAwMDE="
enqueuedTime: "2025-05-27T01:40:42.5100000Z"
body
{
"deviceId": "qiita-test-device-01",
"moduleId": "qiita-test-module-01",
"etag": "AAAAAAAAAAE=",
"version": 2,
"properties": {
"desired": {
"$metadata": {
"$lastUpdated": "0001-01-01T00:00:00Z"
},
"$version": 1
},
"reported": {
"$metadata": {
"$lastUpdated": "0001-01-01T00:00:00Z"
},
"$version": 1
}
}
}
モジュール削除時のデータ
Properties
moduleId: "qiita-test-module-01"
operationTimestamp: "2025-05-27T01:40:46.7869929Z"
iothub-message-schema: "moduleLifecycleNotification"
opType: "deleteModuleIdentity"
deviceId: "qiita-test-device-01"
hubName: "iot-hub-test-0001"
SystemProperties
twin-version: "2"
connectionModuleId: "qiita-test-module-01"
connectionDeviceId: "qiita-test-device-01"
contentType: "application/json"
contentEncoding: "utf-8"
correlationId: "2d6453de53e"
userId: "aW90LWh1Yi10ZXN0LTAwMDE="
enqueuedTime: "2025-05-27T01:40:46.8070000Z"
body
{
"deviceId": "qiita-test-device-01",
"moduleId": "qiita-test-module-01",
"etag": "AAAAAAAAAAE=",
"version": 2,
"properties": {
"desired": {
"$metadata": {
"$lastUpdated": "0001-01-01T00:00:00Z"
},
"$version": 1
},
"reported": {
"$metadata": {
"$lastUpdated": "0001-01-01T00:00:00Z"
},
"$version": 1
}
}
}




























