0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure IoT Hub から Azure Storage にメッセージルーティンした device-to-cloud メッセージ(テレメトリー)を Databricks で参照する方法

Last updated at Posted at 2025-05-27

概要

Azure IoT Hub から Azure Storage にメッセージルーティンした device-to-cloud メッセージ(DeviceMessagesデータソース)を Databricks で参照する方法を紹介します。

事前準備

環境構築

  • Azure IoT Hub の構築 (本手順は Free tier で実施)
  • Azure Storage の構築(Databricks 環境から接続できるように設定)
  • Databricks (本手順は Serveless ではなく汎用コンピュートで実行)

Azure IoT Hub のマネージド ID に対して Azure Storage のストレージ BLOB データ共同作成者権限を付与

image.png

image.png

Azure IoT Hub のマネージド ID に対して Azure Event Hubs のAzure Event Hubs のデータ送信者権限を付与

image.png

image.png

Azure IoT Hub にてカスタムエンドポイントを作成

Azure portal では作成する画面はないため、メッセージ ルーティングを作成するに同時に作成できます。

image.png

Azure IoT Hubs にてメッセージルーティンを設定

image.png

image.png

image.png

Azure IoT Hub にてカスタムエンドポイントを作成

Azure portal では作成する画面はないため、メッセージ ルーティングを作成するに同時に作成できます。

image.png

Azure IoT Hubs にてメッセージルーティンを設定

image.png

image.png

image.png

デバイスの作成と接続文字列の取得

Azure IoT Hub のリソース画面にて、デバイスタブを選択後、+ デバイスの追加を選択します。

image.png

デバイス IDに任意の値を入力し、保存を選択してデバイスを作成します。

image.png

作成したデバイスを表示して、プライマリ接続文字列値をコピーします。

image.png

モジュールの作成と接続文字列の取得

作成済みのデバイスの画面に、+ モジュール ID の追加を選択します。

image.png

モジュール IDに任意の値を入力し、保存を選択してデバイスを作成します。

image.png

作成したモジュールを表示して、プライマリ接続文字列値をコピーします。

image.png

メッセージルーティンの実施

Databricks にてライブラリのインストール

%pip install azure-iot-device -q
dbutils.library.restartPython()

デバイスから device-to-cloud メッセージ(テレメトリー)の送信

import os, json, random, time, uuid
from datetime import datetime, timezone
from azure.iot.device import IoTHubDeviceClient, Message


def get_sample_date():
    date = {
        "temperature": round(random.uniform(20, 25), 2),
        "timestamp": datetime.now(timezone.utc).isoformat(),
    }

    return json.dumps(date)

# クライアントのインスタンス化
device_client = IoTHubDeviceClient.create_from_connection_string(
    device_conn_str,
    websockets=True,
)

# データの準備
payload = get_sample_date()
msg = Message(payload)
msg.message_id = uuid.uuid4()
msg.content_encoding = "utf-8"
msg.content_type = "application/json"
msg.custom_properties["sensor-location"] = "jp"

# クライアントに接続
device_client.connect()

# テレメトリーの送信
device_client.send_message(msg)
print("✓ sent:", payload)
time.sleep(5)

# クライアントの終了
device_client.shutdown()

image.png

モジュールから device-to-cloud メッセージ(テレメトリー)の送信

module_conn_str = "HostName=iot-hub-test-001.azure-devices.net;DeviceId=qiita_test_01;ModuleId=qiita_test_module_01;SharedAccessKey=s5svMAgEglFjcr923J+GfDcXXXXX="

import os, json, random, time, uuid
from datetime import datetime, timezone
from azure.iot.device import IoTHubDeviceClient, Message


def get_sample_date():
    date = {
        "temperature": round(random.uniform(20, 25), 2),
        "timestamp": datetime.now(timezone.utc).isoformat(),
    }

    return json.dumps(date)

# クライアントのインスタンス化
module_client = IoTHubDeviceClient.create_from_connection_string(
    module_conn_str,
    websockets=True,
)

# データの準備
payload = get_sample_date()
msg = Message(payload)
msg.message_id = uuid.uuid4()
msg.content_encoding = "utf-8"
msg.content_type = "application/json"
msg.custom_properties["sensor-location"] = "jp"

# クライアントに接続
module_client.connect()

# テレメトリーの送信
module_client.send_message(msg)
print("✓ sent:", payload)
time.sleep(5)

# クライアントの終了
module_client.shutdown()

image.png

Databricks におけるデータの読み込み

Azure Storage からデータを取得

Azure Storage からデータを取得します。

storage_account = "iothubstorage123"
container       = "qiitatest"

abfs_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/"
abfs_path += "iot-hub-test-0001/*/*/*/*/*/*"
src_df = (
    spark.read
         .format("avro")
         .load(abfs_path)
)
src_df.printSchema()
src_df.display()
src_df.printSchema()
src_df.display()
root
 |-- EnqueuedTimeUtc: string (nullable = true)
 |-- Properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- SystemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- Body: binary (nullable = true)

image.png

body列に対する decode の実施

from pyspark.sql.functions import expr

df = src_df.withColumn(
    "Body",
    expr("decode(Body, 'UTF-8')")
)
df.display()
{
	"temperature": 21.82,
	"timestamp": "2025-05-27T04:22:28.003265+00:00"
}

image.png

<補足>Azure Storage に連携されるデータ

デバイスによる device-to-cloud メッセージ(テレメトリー)のデータ

Properties

sensor-location: "jp"

image.png

SystemProperties

connectionDeviceGenerationId: "638839157113307051"
connectionAuthMethod: "{\"scope\":\"device\",\"type\":\"sas\",\"issuer\":\"iothub\"}"
connectionDeviceId: "qiita_test_device_01"
contentType: "application/json"
messageId: "2ad60f28-54ba-4b42-bf4e-19e580ddb6e0"
contentEncoding: "utf-8"
enqueuedTime: "2025-05-27T04:22:28.2750000Z"

image.png

body

{
	"temperature": 21.82,
	"timestamp": "2025-05-27T04:22:28.003265+00:00"
}

image.png

モジュールによる device-to-cloud メッセージ(テレメトリー)のデータ

Properties

sensor-location: "jp"

image.png

SystemProperties

connectionModuleId: "qiita_test_module_01"
connectionDeviceGenerationId: "638839157225497198"
connectionAuthMethod: "{\"scope\":\"module\",\"type\":\"sas\",\"issuer\":\"iothub\"}"
connectionDeviceId: "qiita_test_device_01"
contentType: "application/json"
messageId: "f4ccdf19-01a3-4df7-912f-7aa4cfbfe4e7"
contentEncoding: "utf-8"
enqueuedTime: "2025-05-27T04:22:33.9320000Z"

image.png

body

{
	"temperature": 24.08,
	"timestamp": "2025-05-27T04:22:33.702755+00:00"
}

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?