0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure IoT Hub から Azure Event Hubs にメッセージルーティンしたデバイス ライフサイクル イベントを Databricks で参照する方法

Last updated at Posted at 2025-05-26

概要

Azure IoT Hub から Azure Event Hubs にメッセージルーティンしたデバイス ライフサイクル イベント(DeviceLifecycleEventsデータソース)を Databricks で参照する方法を紹介します。

事前準備

環境構築

  • Azure IoT Hub の構築 (本手順は Free tier で実施)
  • Azure Event Hubs の構築
  • Databricks (本手順は Serveless ではなく汎用コンピュートで実行)

Evnet hub の作成

image.png

Azure Event Hubs の接続文字列を取得

Azure Event Hubs のリソース画面にて、共有アクセス ポリシータブ -> 追加ポリシーを選択します。

image.png

ポリシー名に任意の名前を入力後、リッスンをチェックし、作成を選択します。

image.png

プライマリ接続文字列の値をコピーします。

image.png

Azure IoT Hub のマネージド ID に対して Azure Event Hubs のAzure Event Hubs のデータ送信者権限を付与

image.png

image.png

Azure IoT Hub にてカスタムエンドポイントを作成

Azure portal では作成する画面はないため、メッセージ ルーティングを作成するに同時に作成できます。

image.png

Azure IoT Hubs にてメッセージルーティンを設定

image.png

image.png

image.png

デバイス ライフサイクル イベントのメッセージルーティンの実施

Device の作成

image.png

image.png

Module の作成

image.png

image.png

Module の削除

image.png

Device の削除

image.png

Azure IoT Hubs にてレコードを確認

image.png

Databricks におけるデータの読み込み

Azure Event Hubs から Kafka エンドポイントによりデータを取得

Azure Event Hubs からデータを取得します。

eh_namespace_name = "kafka-test-01"
eh_connection_string = "Endpoint=sb://kafka-test-01.servicebus.windows.net/;SharedAccessKeyName=qiita-sas-01;SharedAccessKey=fiKmMqJtmn2dGAeV6oj1dWWxxxx="
topic_name = "iot-message-01"

# Databricks から Azure Event Hubs に設定する際の設定をセット
eh_sasl = (
    "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
    f'required username="$ConnectionString" password="{eh_connection_string}";'
)
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
}
# Azure Event Hubs からデータを取得
from pyspark.sql.functions import col

src_df = (
    spark.readStream.format("kafka")
    .options(**kafka_options)
    .option("subscribe", topic_name)
    .option("startingOffsets", "earliest")  # 検証目的であるため earliest に設定
    .option("includeHeaders", "true")
    .load()
)
src_df.printSchema()
src_df.display()
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- headers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: binary (nullable = true)

value列に対する decode の実施

from pyspark.sql.functions import expr

df = src_df.withColumn(
    "value",
    expr("decode(value, 'UTF-8')")
)
df.display()
{
	"deviceId": "qiita-test-device-01",
	"etag": "AAAAAAAAAAE=",
	"version": 2,
	"properties": {
		"desired": {
			"$metadata": {
				"$lastUpdated": "2025-05-26T04:37:50.7967261Z"
			},
			"$version": 1
		},
		"reported": {
			"$metadata": {
				"$lastUpdated": "2025-05-26T04:37:50.7967261Z"
			},
			"$version": 1
		}
	}
}

image.png

header 列からデータの取得

header 列から値を取得する関数はこちら
"""IoT Hub AMQP ヘッダー列を PySpark DataFrame に展開するユーティリティ.

IoT Hub から取得した AMQP 形式のヘッダーは Binary 型で格納される。
本モジュールでは以下を行う。

* 各バイト列を適切な型 (string / timestamp) にデコード
* 配列形式のヘッダー (`array<struct<key:string,value:binary>>`) を
  一括で構造化 (`struct`) する

Google Style Docstring を採用し、日本語で詳細を記述している。

Note:
    Databricks Runtime 14.3 / Spark 3.5.0 で動作確認。
"""
from __future__ import annotations

from functools import reduce
import operator as op
from typing import Dict

from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    col,
    conv,
    element_at,
    expr,
    map_from_entries,
    substring,
    timestamp_millis,
)

# --------------------------------------------------------------------------- #
# 単体デコーダー
# --------------------------------------------------------------------------- #

def decode_string(df: DataFrame, col_name: str, char_set: str = "UTF-8") -> DataFrame:
    """BINARY → STRING 変換を行う.

    Args:
        df: 対象 DataFrame.
        col_name: 変換対象の列名 (binary).
        char_set: デコードに用いる文字セット. デフォルトは ``"UTF-8"``.

    Returns:
        ``col_name`` が string 型へ置換された DataFrame.
    """
    return df.withColumn(col_name, expr(f"decode(`{col_name}`, '{char_set}')"))


def decode_string_with_str8utf8(
    df: DataFrame,
    col_name: str,
    prefix: int = 2,
    char_set: str = "UTF-8",
) -> DataFrame:
    """AMQP ``str8-utf8`` 形式 (prefix+string) をデコードする.

    Args:
        df: 対象 DataFrame.
        col_name: 変換対象の列名 (binary).
        prefix: 先頭の prefix バイト数. デフォルト ``2``.
        char_set: デコードに用いる文字セット.

    Returns:
        ``col_name`` が string 型へ置換された DataFrame.
    """
    sliced = expr(
        f"decode(substring(`{col_name}`, {prefix} + 1, length(`{col_name}`) - {prefix}), '{char_set}')"
    )
    return df.withColumn(col_name, sliced)


def decode_timestamp_str8_utf8(
    df: DataFrame,
    col_name: str,
    prefix: int = 2,
    char_set: str = "UTF-8",
    ts_fmt="yyyy-MM-dd\\'T\\'HH:mm:ss.SSSSSSSX",
) -> DataFrame:
    """AMQP ``timestamp:str8-utf8`` を Spark の ``timestamp`` へ変換する.

    流れ:

    1. ``str8-utf8`` を文字列へデコード
    2. ``TRY_TO_TIMESTAMP`` で timestamp へ変換

    Args:
        df: 対象 DataFrame.
        col_name: 変換対象の列名 (binary).
        prefix: 先頭の prefix バイト数.
        char_set: デコードに用いる文字セット.
        ts_fmt: Spark の日時書式 (``SimpleDateFormat``).

    Returns:
        ``col_name`` が timestamp 型へ置換された DataFrame.
    """
    df = decode_string_with_str8utf8(df, col_name, prefix, char_set)
    return df.withColumn(col_name, expr(f"TRY_TO_TIMESTAMP(`{col_name}`, '{ts_fmt}')"))


def decode_timestamp_with_ms64(df: DataFrame, col_name: str) -> DataFrame:
    """AMQP ``timestamp:ms64`` を Spark の ``timestamp`` へ変換する.

    AMQP ms64 は「符号付き 64bit big-endian 整数 (ミリ秒)」で表される。

    流れ:

    1. 先頭 1 バイトのフォーマットコードを除去
    2. 8 バイトを 16 進文字列へ変換
    3. 各バイトを ``long`` へ展開しビットシフトで合算
    4. ``timestamp_millis`` で ``timestamp`` へ変換

    Args:
        df: 対象 DataFrame.
        col_name: 変換対象の列名 (binary).

    Returns:
        ``col_name`` が timestamp 型へ置換された DataFrame.
    """
    # 1) 余計な 1 バイトを除去し 8 バイトを抽出
    df = df.withColumn(col_name, substring(col(col_name), 2, 8))
    # 2) BINARY → HEX string
    df = df.withColumn(col_name, expr(f"hex(`{col_name}`)"))

    # 3) HEX → signed 64bit integer
    byte_exprs = [
        conv(substring(col(col_name), i * 2 + 1, 2), 16, 10).cast("long") * (1 << (56 - 8 * i))
        for i in range(8)
    ]
    df = df.withColumn(col_name, reduce(op.add, byte_exprs))

    # 4) epoch milli → timestamp
    return df.withColumn(col_name, timestamp_millis(col(col_name)))


# --------------------------------------------------------------------------- #
# ヘッダー展開ユーティリティ
# --------------------------------------------------------------------------- #

def add_cols_from_iot_hub_headers(
    df: DataFrame,
    header_col_name: str = "headers",
    drop_temp_col: bool = True,
) -> DataFrame:
    """ヘッダー配列列を各フィールド列へ展開する.

    IoT Hub ではヘッダーが ``array<struct<key:string,value:binary>>`` として
    送出される。本関数は以下を実施する。

    * ``header_col_name`` を ``map<string,binary>`` へ変換
    * :pydata:`AMQP_HEADER_SPEC` に従い列を追加し型変換
        * ``string``             → :func:`decode_string`
        * ``string:str8-utf8``   → :func:`decode_string_with_str8utf8`
        * ``timestamp:str8-utf8``→ :func:`decode_timestamp_str8_utf8`
        * ``timestamp:ms64``     → :func:`decode_timestamp_with_ms64`

    Args:
        df: 対象 DataFrame.
        header_col_name: AMQP ヘッダーを格納する列名.
        drop_temp_col: 一時列を削除するかどうか.

    Returns:
        追加済み列を含む DataFrame.

    Raises:
        ValueError: :pydata:`AMQP_HEADER_SPEC` に未知の型が含まれる場合.
    """
    # 1) ARRAY<struct<key,value>> → MAP<key,value>
    temp_map_col = f"_amqp_header_map_{header_col_name}"
    df = df.withColumn(temp_map_col, map_from_entries(col(header_col_name)))

    # 2) 型仕様に従い列を追加
    for key, col_type in AMQP_HEADER_SPEC.items():
        df = df.withColumn(key, element_at(col(temp_map_col), key))

        col_type_lc = col_type.lower()
        if col_type_lc == "string":
            df = decode_string(df, key)
        elif col_type_lc == "string:str8-utf8":
            df = decode_string_with_str8utf8(df, key)
        elif col_type_lc == "timestamp:str8-utf8":
            df = decode_timestamp_str8_utf8(df, key)
        elif col_type_lc == "timestamp:ms64":
            df = decode_timestamp_with_ms64(df, key)
        else:
            raise ValueError(f"Unknown AMQP header type: {col_type}")

    if drop_temp_col:
        df = df.drop(temp_map_col)
    return df

image.png

AMQP_HEADER_SPEC = {
    "user-id": "string",
    "content-type": "string",
    "content-encoding": "string",
    "iothub-connection-device-id": "string:str8-utf8",
    "operationTimestamp": "timestamp:str8-utf8",
    "iothub-message-schema": "string:str8-utf8",
    "iothub-enqueuedtime": "timestamp:ms64",
    "opType": "string:str8-utf8",
    "deviceId": "string:str8-utf8",
    "hubName": "string:str8-utf8",
    "iothub-message-source": "string:str8-utf8",
}

header_col_name = "headers"

df = add_cols_from_iot_hub_headers(df, header_col_name)

df.printSchema()
df.display()
key value
user-id iot-hub-test-001
content-type application/json
content-encoding utf-8
iothub-connection-device-id qiita-test-device-01
operationTimestamp 2025-05-26T04:37:50.796+00:00
iothub-message-schema deviceLifecycleNotification
iothub-enqueuedtime 2025-05-26T04:37:50.929+00:00
opType createDeviceIdentity
deviceId qiita-test-device-01
hubName iot-hub-test-001
iothub-message-source deviceLifecycleEvents

image.png

<補足>Azure Event Hubs に連携されるデータ

データの確認方法

Azure Iot Hubs における Data Explorer にて確認できます。

image.png

デバイス作成時のデータ

{
	"deviceId": "qiita-test-device-01",
	"etag": "AAAAAAAAAAE=",
	"version": 2,
	"properties": {
		"desired": {
			"$metadata": {
				"$lastUpdated": "2025-05-26T04:37:50.7967261Z"
			},
			"$version": 1
		},
		"reported": {
			"$metadata": {
				"$lastUpdated": "2025-05-26T04:37:50.7967261Z"
			},
			"$version": 1
		}
	}
}

image.png

デバイス削除時のデータ

{
	"deviceId": "qiita-test-device-01",
	"etag": "AAAAAAAAAAE=",
	"version": 2,
	"properties": {
		"desired": {
			"$metadata": {
				"$lastUpdated": "2025-05-26T04:37:50.7967261Z"
			},
			"$version": 1
		},
		"reported": {
			"$metadata": {
				"$lastUpdated": "2025-05-26T04:37:50.7967261Z"
			},
			"$version": 1
		}
	}
}

image.png

モジュール作成時のデータ

{
	"deviceId": "qiita-test-device-01",
	"moduleId": "qiita-test-module-01",
	"etag": "AAAAAAAAAAE=",
	"version": 2,
	"properties": {
		"desired": {
			"$metadata": {
				"$lastUpdated": "0001-01-01T00:00:00Z"
			},
			"$version": 1
		},
		"reported": {
			"$metadata": {
				"$lastUpdated": "0001-01-01T00:00:00Z"
			},
			"$version": 1
		}
	}
}

image.png

モジュール削除時のデータ

{
	"deviceId": "qiita-test-device-01",
	"moduleId": "qiita-test-module-01",
	"etag": "AAAAAAAAAAE=",
	"version": 2,
	"properties": {
		"desired": {
			"$metadata": {
				"$lastUpdated": "0001-01-01T00:00:00Z"
			},
			"$version": 1
		},
		"reported": {
			"$metadata": {
				"$lastUpdated": "0001-01-01T00:00:00Z"
			},
			"$version": 1
		}
	}
}

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?