概要
Azure IoT Hub から Azure Storage にメッセージルーティンしたデバイス ツイン変更イベント(TwinChangeEvents
データソース)を Databricks で参照する方法を紹介します。
事前準備
環境構築
- Azure IoT Hub の構築 (本手順は Free tier で実施)
- Azure Storage の構築(Databricks 環境から接続できるように設定)
- Databricks (本手順は Serveless ではなく汎用コンピュートで実行)
Azure IoT Hub のマネージド ID に対して Azure Storage のストレージ BLOB データ共同作成者
権限を付与
Azure IoT Hub にてカスタムエンドポイントを作成
Azure portal では作成する画面はないため、メッセージ ルーティングを作成するに同時に作成できます。
Azure IoT Hubs にてメッセージルーティンを設定
デバイスとモジュールの作成
メッセージルーティンの実施
事前準備
%pip install azure-iot-hub -q
dbutils.library.restartPython()
iot_conn_str = "HostName=iot-hub-test-0001.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=V/HbzmQCQJHBScUBCuXXXX="
デバイス twin に対する update_twin と replace_twin の実施
device_id = "qiita_test_device_01"
from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.models import Twin, TwinProperties
# クライアント初期化
registry_manager = IoTHubRegistryManager(iot_conn_str)
try:
# 現在のツインを取得
twin = registry_manager.get_twin(device_id)
print("変更前の Twin:", twin.as_dict()["properties"]["desired"].get("temp", None))
# 更新内容を定義
desired = {"temp": "100"}
twin_patch = Twin(properties=TwinProperties(desired=desired))
# 部分更新を実行(ETag を指定)
updated_twin = registry_manager.update_twin(device_id, twin_patch, twin.etag)
print("変更後の Twin:", updated_twin.as_dict()["properties"]["desired"].get("temp", None))
except Exception as ex:
print("エラー:", ex)
from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.models import Twin, TwinProperties
registry_manager = IoTHubRegistryManager(iot_conn_str)
# 現在のツインを取得
twin = registry_manager.get_twin(device_id)
print("変更前の Twin:", twin.as_dict()["properties"])
# 新しいツイン情報を構築
new_twin = Twin(
properties=TwinProperties(
desired={
"key_1": "a",
"key_2": "b",
"key_3": "c",
},
reported=None # reported プロパティは変更しない場合 None
)
)
# ツイン全体を置換
replaced_twin = registry_manager.replace_twin(device_id, new_twin)
print("")
print("変更後の Twin:", replaced_twin.as_dict()["properties"])
モジュール twin に対する update_twin と replace_twin の実施
device_id = "qiita_test_device_01"
module_id = "qiita_test_module_01"
from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.models import Twin, TwinProperties
# クライアント初期化
registry_manager = IoTHubRegistryManager(iot_conn_str)
try:
# ① Twin を取得して現在値と etag を確認
twin = registry_manager.get_module_twin(device_id, module_id)
print("Before:", twin.properties.desired.get("temp"))
# ② 差分だけを Twin Patch に詰める
patch = Twin(properties=TwinProperties(desired={"temp": 100}))
# ③ 部分更新(競合無視なら etag='*')
updated = registry_manager.update_module_twin(
device_id, module_id, patch, etag=twin.etag # or etag="*"
)
print("After :", updated.properties.desired.get("temp"))
except Exception as ex:
print("エラー:", ex)
from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.models import Twin, TwinProperties
registry_manager = IoTHubRegistryManager(iot_conn_str)
# ① 現在のモジュール Twin を取得 (etag も受け取る)
twin = registry_manager.get_module_twin(device_id, module_id) # ← module 用 API
print("置換前:", twin.properties.desired)
# ② Twin オブジェクトを丸ごと構築
# ※ Service 側からは tags と desired しか書き込めません
new_twin = Twin(
properties=TwinProperties(
desired={
"key_1": "a",
"key_2": "b",
"key_3": "c",
}
)
)
# ③ etag を使って全置換 (競合無視なら etag="*")
replaced = registry_manager.replace_module_twin(
device_id, module_id, new_twin, etag=twin.etag # or etag="*"
)
print("置換後:", replaced.properties.desired)
Databricks におけるデータの読み込み
Azure Storage からデータを取得
Azure Storage からデータを取得します。
storage_account = "iothubstorage123"
container = "qiitatest"
abfs_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/"
abfs_path += "iot-hub-test-0001/*/*/*/*/*/*"
src_df = (
spark.read
.format("avro")
.load(abfs_path)
)
src_df.printSchema()
src_df.display()
src_df.printSchema()
src_df.display()
root
|-- EnqueuedTimeUtc: string (nullable = true)
|-- Properties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- SystemProperties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- Body: binary (nullable = true)
body
列に対する decode の実施
from pyspark.sql.functions import expr
df = src_df.withColumn(
"Body",
expr("decode(Body, 'UTF-8')")
)
df.display()
{
"version": 3,
"properties": {
"desired": {
"temp": "100",
"$metadata": {
"$lastUpdated": "2025-05-27T04:08:48.9093199Z",
"$lastUpdatedVersion": 2,
"temp": {
"$lastUpdated": "2025-05-27T04:08:48.9093199Z",
"$lastUpdatedVersion": 2
}
},
"$version": 2
}
}
}
<補足>Azure Storage に連携されるデータ
デバイス twin に対する update_twin のデータ
Properties
operationTimestamp: "2025-05-27T04:08:48.9093199Z"
iothub-message-schema: "twinChangeNotification"
opType: "updateTwin"
deviceId: "qiita_test_device_01"
hubName: "iot-hub-test-0001"
SystemProperties
twin-version: "3"
connectionDeviceId: "qiita_test_device_01"
contentType: "application/json"
contentEncoding: "utf-8"
correlationId: "2eaf33f41d3"
userId: "aW90LWh1Yi10ZXN0LTAwMDE="
enqueuedTime: "2025-05-27T04:08:49.0610000Z"
body
{
"version": 3,
"properties": {
"desired": {
"temp": "100",
"$metadata": {
"$lastUpdated": "2025-05-27T04:08:48.9093199Z",
"$lastUpdatedVersion": 2,
"temp": {
"$lastUpdated": "2025-05-27T04:08:48.9093199Z",
"$lastUpdatedVersion": 2
}
},
"$version": 2
}
}
}
デバイス twin に対する replace_twin のデータ
Properties
operationTimestamp: "2025-05-27T04:08:51.5656567Z"
iothub-message-schema: "twinChangeNotification"
opType: "replaceTwin"
deviceId: "qiita_test_device_01"
hubName: "iot-hub-test-0001"
SystemProperties
twin-version: "4"
connectionDeviceId: "qiita_test_device_01"
contentType: "application/json"
contentEncoding: "utf-8"
correlationId: "2eaf4d2d8d1"
userId: "aW90LWh1Yi10ZXN0LTAwMDE="
enqueuedTime: "2025-05-27T04:08:51.5770000Z"
body
{
"deviceId": "qiita_test_device_01",
"etag": "AAAAAAAAAAM=",
"version": 4,
"properties": {
"desired": {
"key_3": "c",
"key_1": "a",
"key_2": "b",
"$metadata": {
"$lastUpdated": "2025-05-27T04:08:51.5656567Z",
"$lastUpdatedVersion": 3,
"key_3": {
"$lastUpdated": "2025-05-27T04:08:51.5656567Z",
"$lastUpdatedVersion": 3
},
"key_1": {
"$lastUpdated": "2025-05-27T04:08:51.5656567Z",
"$lastUpdatedVersion": 3
},
"key_2": {
"$lastUpdated": "2025-05-27T04:08:51.5656567Z",
"$lastUpdatedVersion": 3
}
},
"$version": 3
},
"reported": {
"$metadata": {
"$lastUpdated": "2025-05-27T04:08:31.3307051Z"
},
"$version": 1
}
}
}
モジュール twin に対する update_twin のデータ
Properties
moduleId: "qiita_test_module_01"
operationTimestamp: "2025-05-27T04:08:56.0189103Z"
iothub-message-schema: "twinChangeNotification"
opType: "updateTwin"
deviceId: "qiita_test_device_01"
hubName: "iot-hub-test-0001"
SystemProperties
twin-version: "3"
connectionModuleId: "qiita_test_module_01"
connectionDeviceId: "qiita_test_device_01"
contentType: "application/json"
contentEncoding: "utf-8"
correlationId: "2eaf77b7232"
userId: "aW90LWh1Yi10ZXN0LTAwMDE="
enqueuedTime: "2025-05-27T04:08:56.0300000Z"
body
{
"version": 3,
"properties": {
"desired": {
"temp": 100,
"$metadata": {
"$lastUpdated": "2025-05-27T04:08:56.0189103Z",
"$lastUpdatedVersion": 2,
"temp": {
"$lastUpdated": "2025-05-27T04:08:56.0189103Z",
"$lastUpdatedVersion": 2
}
},
"$version": 2
}
}
}
モジュール twin に対する replace_twin の実データ
Properties
moduleId: "qiita_test_module_01"
operationTimestamp: "2025-05-27T04:08:57.6127052Z"
iothub-message-schema: "twinChangeNotification"
opType: "replaceTwin"
deviceId: "qiita_test_device_01"
hubName: "iot-hub-test-0001"
SystemProperties
twin-version: "4"
connectionModuleId: "qiita_test_module_01"
connectionDeviceId: "qiita_test_device_01"
contentType: "application/json"
contentEncoding: "utf-8"
correlationId: "2eaf86e9cf5"
userId: "aW90LWh1Yi10ZXN0LTAwMDE="
enqueuedTime: "2025-05-27T04:08:57.6400000Z"
body
{
"deviceId": "qiita_test_device_01",
"moduleId": "qiita_test_module_01",
"etag": "AAAAAAAAAAM=",
"version": 4,
"properties": {
"desired": {
"key_3": "c",
"key_1": "a",
"key_2": "b",
"$metadata": {
"$lastUpdated": "2025-05-27T04:08:57.6127052Z",
"$lastUpdatedVersion": 3,
"key_3": {
"$lastUpdated": "2025-05-27T04:08:57.6127052Z",
"$lastUpdatedVersion": 3
},
"key_1": {
"$lastUpdated": "2025-05-27T04:08:57.6127052Z",
"$lastUpdatedVersion": 3
},
"key_2": {
"$lastUpdated": "2025-05-27T04:08:57.6127052Z",
"$lastUpdatedVersion": 3
}
},
"$version": 3
},
"reported": {
"$metadata": {
"$lastUpdated": "0001-01-01T00:00:00Z"
},
"$version": 1
}
}
}