概要
Databricks にて Azure IoT Hub に対して Twin's desired properties を送受信する方法を紹介します。本記事で詳細する方法は、検証目的で実行する方法です。
事前準備
環境構築
- Azure IoT Hub の構築 (本手順は Free tier で実施)
- Databricks (本手順は Serveless ではなく汎用コンピュートで実行)
デバイスの作成と接続文字列の取得
Azure IoT Hub のリソース画面にて、デバイス
タブを選択後、+ デバイスの追加
を選択します。
デバイス ID
に任意の値を入力し、保存
を選択してデバイスを作成します。
作成したデバイスを表示して、プライマリ接続文字列
のコピー済み
を選択して値をコピーします。
組み込みのエンドポイントの接続文字列を取得
Azure IoT Hub のリソース画面にて、組み込みのエンドポイント
タブを選択して、イベントハブ互換エンドポイント
の値をコピーします。
Azure IoT Hub の接続文字列を取得
Azure IoT Hub のリソース画面にて、共有アクセス ポリシー
タブ -> iothubowner
ポリシーを選択して、プライマリ接続文字列
の値をコピーします。
Twin's desired properties の受信
Databricks にて下記のコードを実行して、Twin's desired プロパティの受信します。処理が 60 秒継続するようにしております。
%pip install azure-iot-device -q
dbutils.library.restartPython()
device_conn_str = "HostName=iot-hub-test-001.azure-devices.net;DeviceId=qiita_test_01;SharedAccessKey=r2fxXUdn/4w9iNJdhqnIJZ+h0qxxxxx="
import time
from datetime import datetime, timezone
from azure.iot.device import IoTHubDeviceClient
def sanitize(patch: dict) -> dict:
"""'$'で始まるキーや None を除外して返す"""
return {
k: v
for k, v in patch.items()
if not k.startswith("$") and v is not None
}
def twin_patch_handler(patch):
print("受信:", patch)
clean = sanitize(patch)
clean["lastAppliedUtc"] = datetime.now(timezone.utc).isoformat()
try:
client.patch_twin_reported_properties(clean)
print("反映:", clean)
except Exception as exc:
print("送信失敗:", exc)
client = IoTHubDeviceClient.create_from_connection_string(
device_conn_str,
websockets=True,
)
client.on_twin_desired_properties_patch_received = twin_patch_handler
client.connect()
client.patch_twin_reported_properties(
{"startedUtc": datetime.now(timezone.utc).isoformat(),}
)
start_time = time.time()
duration = 60
while time.time() - start_time < duration:
time.sleep(10)
client.shutdown()
Twin's desired properties の送信
Databricks にて別のノートブックを作成して、下記のコードを実行して、Twin's desired プロパティを送信します。
%pip install azure-iot-hub -q
dbutils.library.restartPython()
iot_conn_str = "HostName=iot-hub-test-001.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=FUum/tiOgEuhye/JqAb+XXXXX="
device_id = "qiita_test_01"
import time
from datetime import datetime, timezone
from azure.iot.device import IoTHubDeviceClient
def sanitize(patch: dict) -> dict:
"""'$'で始まるキーや None を除外して返す"""
return {
k: v
for k, v in patch.items()
if not k.startswith("$") and v is not None
}
def twin_patch_handler(patch):
print("受信:", patch)
clean = sanitize(patch)
clean["lastAppliedUtc"] = datetime.now(timezone.utc).isoformat()
try:
client.patch_twin_reported_properties(clean)
print("反映:", clean)
except Exception as exc:
print("送信失敗:", exc)
client = IoTHubDeviceClient.create_from_connection_string(
device_conn_str,
websockets=True,
)
client.on_twin_desired_properties_patch_received = twin_patch_handler
client.connect()
client.patch_twin_reported_properties(
{"startedUtc": datetime.now(timezone.utc).isoformat(),}
)
start_time = time.time()
duration = 60
while time.time() - start_time < duration:
time.sleep(10)
client.shutdown()
Twin's desired プロパティを受信しているセルを確認すると、受信したことを確認できます。