概要
Azure IoT Hub から Azure Event Hubs にメッセージルーティンしたデバイス ライフサイクル イベント(DeviceLifecycleEvents
データソース)を Databricks で参照する方法を紹介します。
事前準備
環境構築
- Azure IoT Hub の構築 (本手順は Free tier で実施)
- Azure Event Hubs の構築
- Databricks (本手順は Serveless ではなく汎用コンピュートで実行)
Evnet hub の作成
Azure Event Hubs の接続文字列を取得
Azure Event Hubs のリソース画面にて、共有アクセス ポリシー
タブ -> 追加
ポリシーを選択します。
ポリシー名に任意の名前を入力後、リッスン
をチェックし、作成
を選択します。
プライマリ接続文字列
の値をコピーします。
Azure IoT Hub のマネージド ID に対して Azure Event Hubs のAzure Event Hubs のデータ送信者
権限を付与
Azure IoT Hub にてカスタムエンドポイントを作成
Azure portal では作成する画面はないため、メッセージ ルーティングを作成するに同時に作成できます。
Azure IoT Hubs にてメッセージルーティンを設定
デバイス ライフサイクル イベントのメッセージルーティンの実施
Device の作成
Module の作成
Module の削除
Device の削除
Azure IoT Hubs にてレコードを確認
Databricks におけるデータの読み込み
Azure Event Hubs から Kafka エンドポイントによりデータを取得
Azure Event Hubs からデータを取得します。
eh_namespace_name = "kafka-test-01"
eh_connection_string = "Endpoint=sb://kafka-test-01.servicebus.windows.net/;SharedAccessKeyName=qiita-sas-01;SharedAccessKey=fiKmMqJtmn2dGAeV6oj1dWWxxxx="
topic_name = "iot-message-01"
# Databricks から Azure Event Hubs に設定する際の設定をセット
eh_sasl = (
"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
f'required username="$ConnectionString" password="{eh_connection_string}";'
)
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
}
# Azure Event Hubs からデータを取得
from pyspark.sql.functions import col
src_df = (
spark.readStream.format("kafka")
.options(**kafka_options)
.option("subscribe", topic_name)
.option("startingOffsets", "earliest") # 検証目的であるため earliest に設定
.option("includeHeaders", "true")
.load()
)
src_df.printSchema()
src_df.display()
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
|-- headers: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- value: binary (nullable = true)
value
列に対する decode の実施
from pyspark.sql.functions import expr
df = src_df.withColumn(
"value",
expr("decode(value, 'UTF-8')")
)
df.display()
{
"deviceId": "qiita-test-device-01",
"etag": "AAAAAAAAAAE=",
"version": 2,
"properties": {
"desired": {
"$metadata": {
"$lastUpdated": "2025-05-26T04:37:50.7967261Z"
},
"$version": 1
},
"reported": {
"$metadata": {
"$lastUpdated": "2025-05-26T04:37:50.7967261Z"
},
"$version": 1
}
}
}
header
列からデータの取得
header 列から値を取得する関数はこちら
"""IoT Hub AMQP ヘッダー列を PySpark DataFrame に展開するユーティリティ.
IoT Hub から取得した AMQP 形式のヘッダーは Binary 型で格納される。
本モジュールでは以下を行う。
* 各バイト列を適切な型 (string / timestamp) にデコード
* 配列形式のヘッダー (`array<struct<key:string,value:binary>>`) を
一括で構造化 (`struct`) する
Google Style Docstring を採用し、日本語で詳細を記述している。
Note:
Databricks Runtime 14.3 / Spark 3.5.0 で動作確認。
"""
from __future__ import annotations
from functools import reduce
import operator as op
from typing import Dict
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
col,
conv,
element_at,
expr,
map_from_entries,
substring,
timestamp_millis,
)
# --------------------------------------------------------------------------- #
# 単体デコーダー
# --------------------------------------------------------------------------- #
def decode_string(df: DataFrame, col_name: str, char_set: str = "UTF-8") -> DataFrame:
"""BINARY → STRING 変換を行う.
Args:
df: 対象 DataFrame.
col_name: 変換対象の列名 (binary).
char_set: デコードに用いる文字セット. デフォルトは ``"UTF-8"``.
Returns:
``col_name`` が string 型へ置換された DataFrame.
"""
return df.withColumn(col_name, expr(f"decode(`{col_name}`, '{char_set}')"))
def decode_string_with_str8utf8(
df: DataFrame,
col_name: str,
prefix: int = 2,
char_set: str = "UTF-8",
) -> DataFrame:
"""AMQP ``str8-utf8`` 形式 (prefix+string) をデコードする.
Args:
df: 対象 DataFrame.
col_name: 変換対象の列名 (binary).
prefix: 先頭の prefix バイト数. デフォルト ``2``.
char_set: デコードに用いる文字セット.
Returns:
``col_name`` が string 型へ置換された DataFrame.
"""
sliced = expr(
f"decode(substring(`{col_name}`, {prefix} + 1, length(`{col_name}`) - {prefix}), '{char_set}')"
)
return df.withColumn(col_name, sliced)
def decode_timestamp_str8_utf8(
df: DataFrame,
col_name: str,
prefix: int = 2,
char_set: str = "UTF-8",
ts_fmt="yyyy-MM-dd\\'T\\'HH:mm:ss.SSSSSSSX",
) -> DataFrame:
"""AMQP ``timestamp:str8-utf8`` を Spark の ``timestamp`` へ変換する.
流れ:
1. ``str8-utf8`` を文字列へデコード
2. ``TRY_TO_TIMESTAMP`` で timestamp へ変換
Args:
df: 対象 DataFrame.
col_name: 変換対象の列名 (binary).
prefix: 先頭の prefix バイト数.
char_set: デコードに用いる文字セット.
ts_fmt: Spark の日時書式 (``SimpleDateFormat``).
Returns:
``col_name`` が timestamp 型へ置換された DataFrame.
"""
df = decode_string_with_str8utf8(df, col_name, prefix, char_set)
return df.withColumn(col_name, expr(f"TRY_TO_TIMESTAMP(`{col_name}`, '{ts_fmt}')"))
def decode_timestamp_with_ms64(df: DataFrame, col_name: str) -> DataFrame:
"""AMQP ``timestamp:ms64`` を Spark の ``timestamp`` へ変換する.
AMQP ms64 は「符号付き 64bit big-endian 整数 (ミリ秒)」で表される。
流れ:
1. 先頭 1 バイトのフォーマットコードを除去
2. 8 バイトを 16 進文字列へ変換
3. 各バイトを ``long`` へ展開しビットシフトで合算
4. ``timestamp_millis`` で ``timestamp`` へ変換
Args:
df: 対象 DataFrame.
col_name: 変換対象の列名 (binary).
Returns:
``col_name`` が timestamp 型へ置換された DataFrame.
"""
# 1) 余計な 1 バイトを除去し 8 バイトを抽出
df = df.withColumn(col_name, substring(col(col_name), 2, 8))
# 2) BINARY → HEX string
df = df.withColumn(col_name, expr(f"hex(`{col_name}`)"))
# 3) HEX → signed 64bit integer
byte_exprs = [
conv(substring(col(col_name), i * 2 + 1, 2), 16, 10).cast("long") * (1 << (56 - 8 * i))
for i in range(8)
]
df = df.withColumn(col_name, reduce(op.add, byte_exprs))
# 4) epoch milli → timestamp
return df.withColumn(col_name, timestamp_millis(col(col_name)))
# --------------------------------------------------------------------------- #
# ヘッダー展開ユーティリティ
# --------------------------------------------------------------------------- #
def add_cols_from_iot_hub_headers(
df: DataFrame,
header_col_name: str = "headers",
drop_temp_col: bool = True,
) -> DataFrame:
"""ヘッダー配列列を各フィールド列へ展開する.
IoT Hub ではヘッダーが ``array<struct<key:string,value:binary>>`` として
送出される。本関数は以下を実施する。
* ``header_col_name`` を ``map<string,binary>`` へ変換
* :pydata:`AMQP_HEADER_SPEC` に従い列を追加し型変換
* ``string`` → :func:`decode_string`
* ``string:str8-utf8`` → :func:`decode_string_with_str8utf8`
* ``timestamp:str8-utf8``→ :func:`decode_timestamp_str8_utf8`
* ``timestamp:ms64`` → :func:`decode_timestamp_with_ms64`
Args:
df: 対象 DataFrame.
header_col_name: AMQP ヘッダーを格納する列名.
drop_temp_col: 一時列を削除するかどうか.
Returns:
追加済み列を含む DataFrame.
Raises:
ValueError: :pydata:`AMQP_HEADER_SPEC` に未知の型が含まれる場合.
"""
# 1) ARRAY<struct<key,value>> → MAP<key,value>
temp_map_col = f"_amqp_header_map_{header_col_name}"
df = df.withColumn(temp_map_col, map_from_entries(col(header_col_name)))
# 2) 型仕様に従い列を追加
for key, col_type in AMQP_HEADER_SPEC.items():
df = df.withColumn(key, element_at(col(temp_map_col), key))
col_type_lc = col_type.lower()
if col_type_lc == "string":
df = decode_string(df, key)
elif col_type_lc == "string:str8-utf8":
df = decode_string_with_str8utf8(df, key)
elif col_type_lc == "timestamp:str8-utf8":
df = decode_timestamp_str8_utf8(df, key)
elif col_type_lc == "timestamp:ms64":
df = decode_timestamp_with_ms64(df, key)
else:
raise ValueError(f"Unknown AMQP header type: {col_type}")
if drop_temp_col:
df = df.drop(temp_map_col)
return df
AMQP_HEADER_SPEC = {
"user-id": "string",
"content-type": "string",
"content-encoding": "string",
"iothub-connection-device-id": "string:str8-utf8",
"operationTimestamp": "timestamp:str8-utf8",
"iothub-message-schema": "string:str8-utf8",
"iothub-enqueuedtime": "timestamp:ms64",
"opType": "string:str8-utf8",
"deviceId": "string:str8-utf8",
"hubName": "string:str8-utf8",
"iothub-message-source": "string:str8-utf8",
}
header_col_name = "headers"
df = add_cols_from_iot_hub_headers(df, header_col_name)
df.printSchema()
df.display()
key | value |
---|---|
user-id | iot-hub-test-001 |
content-type | application/json |
content-encoding | utf-8 |
iothub-connection-device-id | qiita-test-device-01 |
operationTimestamp | 2025-05-26T04:37:50.796+00:00 |
iothub-message-schema | deviceLifecycleNotification |
iothub-enqueuedtime | 2025-05-26T04:37:50.929+00:00 |
opType | createDeviceIdentity |
deviceId | qiita-test-device-01 |
hubName | iot-hub-test-001 |
iothub-message-source | deviceLifecycleEvents |
<補足>Azure Event Hubs に連携されるデータ
データの確認方法
Azure Iot Hubs における Data Explorer にて確認できます。
デバイス作成時のデータ
{
"deviceId": "qiita-test-device-01",
"etag": "AAAAAAAAAAE=",
"version": 2,
"properties": {
"desired": {
"$metadata": {
"$lastUpdated": "2025-05-26T04:37:50.7967261Z"
},
"$version": 1
},
"reported": {
"$metadata": {
"$lastUpdated": "2025-05-26T04:37:50.7967261Z"
},
"$version": 1
}
}
}
デバイス削除時のデータ
{
"deviceId": "qiita-test-device-01",
"etag": "AAAAAAAAAAE=",
"version": 2,
"properties": {
"desired": {
"$metadata": {
"$lastUpdated": "2025-05-26T04:37:50.7967261Z"
},
"$version": 1
},
"reported": {
"$metadata": {
"$lastUpdated": "2025-05-26T04:37:50.7967261Z"
},
"$version": 1
}
}
}
モジュール作成時のデータ
{
"deviceId": "qiita-test-device-01",
"moduleId": "qiita-test-module-01",
"etag": "AAAAAAAAAAE=",
"version": 2,
"properties": {
"desired": {
"$metadata": {
"$lastUpdated": "0001-01-01T00:00:00Z"
},
"$version": 1
},
"reported": {
"$metadata": {
"$lastUpdated": "0001-01-01T00:00:00Z"
},
"$version": 1
}
}
}
モジュール削除時のデータ
{
"deviceId": "qiita-test-device-01",
"moduleId": "qiita-test-module-01",
"etag": "AAAAAAAAAAE=",
"version": 2,
"properties": {
"desired": {
"$metadata": {
"$lastUpdated": "0001-01-01T00:00:00Z"
},
"$version": 1
},
"reported": {
"$metadata": {
"$lastUpdated": "0001-01-01T00:00:00Z"
},
"$version": 1
}
}
}