概要
Azure IoT Hub から Azure Storage にメッセージルーティンした device-to-cloud メッセージ(DeviceMessages
データソース)を Databricks で参照する方法を紹介します。
事前準備
環境構築
- Azure IoT Hub の構築 (本手順は Free tier で実施)
- Azure Storage の構築(Databricks 環境から接続できるように設定)
- Databricks (本手順は Serveless ではなく汎用コンピュートで実行)
Azure IoT Hub のマネージド ID に対して Azure Storage のストレージ BLOB データ共同作成者
権限を付与
Azure IoT Hub のマネージド ID に対して Azure Event Hubs のAzure Event Hubs のデータ送信者
権限を付与
Azure IoT Hub にてカスタムエンドポイントを作成
Azure portal では作成する画面はないため、メッセージ ルーティングを作成するに同時に作成できます。
Azure IoT Hubs にてメッセージルーティンを設定
Azure IoT Hub にてカスタムエンドポイントを作成
Azure portal では作成する画面はないため、メッセージ ルーティングを作成するに同時に作成できます。
Azure IoT Hubs にてメッセージルーティンを設定
デバイスの作成と接続文字列の取得
Azure IoT Hub のリソース画面にて、デバイス
タブを選択後、+ デバイス
の追加を選択します。
デバイス ID
に任意の値を入力し、保存
を選択してデバイスを作成します。
作成したデバイスを表示して、プライマリ接続文字列
値をコピーします。
モジュールの作成と接続文字列の取得
作成済みのデバイスの画面に、+ モジュール ID の追加
を選択します。
モジュール ID
に任意の値を入力し、保存
を選択してデバイスを作成します。
作成したモジュールを表示して、プライマリ接続文字列
値をコピーします。
メッセージルーティンの実施
Databricks にてライブラリのインストール
%pip install azure-iot-device -q
dbutils.library.restartPython()
デバイスから device-to-cloud メッセージ(テレメトリー)の送信
import os, json, random, time, uuid
from datetime import datetime, timezone
from azure.iot.device import IoTHubDeviceClient, Message
def get_sample_date():
date = {
"temperature": round(random.uniform(20, 25), 2),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
return json.dumps(date)
# クライアントのインスタンス化
device_client = IoTHubDeviceClient.create_from_connection_string(
device_conn_str,
websockets=True,
)
# データの準備
payload = get_sample_date()
msg = Message(payload)
msg.message_id = uuid.uuid4()
msg.content_encoding = "utf-8"
msg.content_type = "application/json"
msg.custom_properties["sensor-location"] = "jp"
# クライアントに接続
device_client.connect()
# テレメトリーの送信
device_client.send_message(msg)
print("✓ sent:", payload)
time.sleep(5)
# クライアントの終了
device_client.shutdown()
モジュールから device-to-cloud メッセージ(テレメトリー)の送信
module_conn_str = "HostName=iot-hub-test-001.azure-devices.net;DeviceId=qiita_test_01;ModuleId=qiita_test_module_01;SharedAccessKey=s5svMAgEglFjcr923J+GfDcXXXXX="
import os, json, random, time, uuid
from datetime import datetime, timezone
from azure.iot.device import IoTHubDeviceClient, Message
def get_sample_date():
date = {
"temperature": round(random.uniform(20, 25), 2),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
return json.dumps(date)
# クライアントのインスタンス化
module_client = IoTHubDeviceClient.create_from_connection_string(
module_conn_str,
websockets=True,
)
# データの準備
payload = get_sample_date()
msg = Message(payload)
msg.message_id = uuid.uuid4()
msg.content_encoding = "utf-8"
msg.content_type = "application/json"
msg.custom_properties["sensor-location"] = "jp"
# クライアントに接続
module_client.connect()
# テレメトリーの送信
module_client.send_message(msg)
print("✓ sent:", payload)
time.sleep(5)
# クライアントの終了
module_client.shutdown()
Databricks におけるデータの読み込み
Azure Storage からデータを取得
Azure Storage からデータを取得します。
storage_account = "iothubstorage123"
container = "qiitatest"
abfs_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/"
abfs_path += "iot-hub-test-0001/*/*/*/*/*/*"
src_df = (
spark.read
.format("avro")
.load(abfs_path)
)
src_df.printSchema()
src_df.display()
src_df.printSchema()
src_df.display()
root
|-- EnqueuedTimeUtc: string (nullable = true)
|-- Properties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- SystemProperties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- Body: binary (nullable = true)
body
列に対する decode の実施
from pyspark.sql.functions import expr
df = src_df.withColumn(
"Body",
expr("decode(Body, 'UTF-8')")
)
df.display()
{
"temperature": 21.82,
"timestamp": "2025-05-27T04:22:28.003265+00:00"
}
<補足>Azure Storage に連携されるデータ
デバイスによる device-to-cloud メッセージ(テレメトリー)のデータ
Properties
sensor-location: "jp"
SystemProperties
connectionDeviceGenerationId: "638839157113307051"
connectionAuthMethod: "{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\"}"
connectionDeviceId: "qiita_test_device_01"
contentType: "application/json"
messageId: "2ad60f28-54ba-4b42-bf4e-19e580ddb6e0"
contentEncoding: "utf-8"
enqueuedTime: "2025-05-27T04:22:28.2750000Z"
body
{
"temperature": 21.82,
"timestamp": "2025-05-27T04:22:28.003265+00:00"
}
モジュールによる device-to-cloud メッセージ(テレメトリー)のデータ
Properties
sensor-location: "jp"
SystemProperties
connectionModuleId: "qiita_test_module_01"
connectionDeviceGenerationId: "638839157225497198"
connectionAuthMethod: "{\"scope\":\"module\",\"type\":\"sas\",\"issuer\":\"iothub\"}"
connectionDeviceId: "qiita_test_device_01"
contentType: "application/json"
messageId: "f4ccdf19-01a3-4df7-912f-7aa4cfbfe4e7"
contentEncoding: "utf-8"
enqueuedTime: "2025-05-27T04:22:33.9320000Z"
body
{
"temperature": 24.08,
"timestamp": "2025-05-27T04:22:33.702755+00:00"
}