概要
Databricks にて Azure IoT Hub に対してダイレクトメソッドの呼び出しを実施する方法を紹介します。本記事で詳細する方法は、検証目的で実行する方法です。
事前準備
環境構築
- Azure IoT Hub の構築 (本手順は Free tier で実施)
- Databricks (本手順は Serveless ではなく汎用コンピュートで実行)
デバイスの作成と接続文字列の取得
Azure IoT Hub のリソース画面にて、デバイス
タブを選択後、+ デバイスの追加
を選択します。
デバイス ID
に任意の値を入力し、保存
を選択してデバイスを作成します。
作成したデバイスを表示して、プライマリ接続文字列
のコピー済み
を選択して値をコピーします。
組み込みのエンドポイントの接続文字列を取得
Azure IoT Hub のリソース画面にて、組み込みのエンドポイント
タブを選択して、イベントハブ互換エンドポイント
の値をコピーします。
Azure IoT Hub の接続文字列を取得
Azure IoT Hub のリソース画面にて、共有アクセス ポリシー
タブ -> iothubowner
ポリシーを選択して、プライマリ接続文字列
の値をコピーします。
ダイレクトメソッドの呼び出しを実施する方法
ダイレクトメソッドコールバックリスナーを起動
Databricks にて下記のコードを実行します。
%pip install azure-iot-device -q
dbutils.library.restartPython()
device_conn_str = "HostName=iot-hub-test-001.azure-devices.net;DeviceId=qiita_test_01;SharedAccessKey=r2fxXUdn/4w9iNJdhqnIJZ+h0qxxxxx="
import time
from datetime import datetime, timezone
from azure.iot.device import IoTHubDeviceClient, MethodResponse
def handle_reboot(request):
delay = request.payload.get("delay", 0)
print("payload: ",request.payload)
print(f"Rebooting in {delay} sec …")
status = 200
payload = {
"result": "OK",
"delay": delay,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
client.send_method_response(
MethodResponse.create_from_method_request(request, status, payload)
)
client = IoTHubDeviceClient.create_from_connection_string(device_conn_str)
client.on_method_request_received = handle_reboot
client.connect()
print("Listening for direct methods.")
# 一定時間待機後に停止
time.sleep(60)
client.shutdown()
ダイレクトメソッドの呼び出しを実施
Databricks にて異なるノートブックを作成して、下記のコードを実行します。
%pip install azure-iot-hub -q
dbutils.library.restartPython()
iot_conn_str = "HostName=iot-hub-test-001.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=FUum/tiOgEuhye/JqAb+XXXXX="
device_id = "qiita_test_01"
from datetime import datetime, timezone
from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.models import CloudToDeviceMethod
rgm = IoTHubRegistryManager(iot_conn_str)
method = CloudToDeviceMethod(
method_name="reboot",
payload={
"delay": 5,
"timestamp": datetime.now(timezone.utc).isoformat(),
},
response_timeout_in_seconds=30,
)
resp = rgm.invoke_device_method(
device_id=device_id,
direct_method_request=method,
)
print(resp.status, resp.payload)