0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure IoT Hub から Azure Event Hubs にメッセージルーティンした際のエンリッチの値を Databricks で参照する方法

Last updated at Posted at 2025-05-27

概要

Azure IoT Hub から Azure Event Hubs にメッセージルーティンした際のエンリッチの値を Databricks で参照する方法を紹介します。header列にて連携されるようです。本記事ではでエンリッチ機能にフォーカスした内容となっており、下記記事内容を前提としています。

メッセージルーティンの実施

エンリッチの設定を実施

image.png

device-to-cloud メッセージ(テレメトリー)の送信

image.png

Databricks におけるデータの読み込み

Azure Event Hubs から Kafka エンドポイントによりデータを取得

Azure Event Hubs からデータを取得すると、header列に含まれていることを確認できます。

eh_namespace_name = "kafka-test-01"
eh_connection_string = "Endpoint=sb://kafka-test-01.servicebus.windows.net/;SharedAccessKeyName=qiita-sas-01;SharedAccessKey=fiKmMqJtmn2dGAeV6oj1dWWxxxx="
topic_name = "iot-message-01"

# Databricks から Azure Event Hubs に設定する際の設定をセット
eh_sasl = (
    "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
    f'required username="$ConnectionString" password="{eh_connection_string}";'
)
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
}
# Azure Event Hubs からデータを取得
from pyspark.sql.functions import col

src_df = (
    spark.readStream.format("kafka")
    .options(**kafka_options)
    .option("subscribe", topic_name)
    .option("startingOffsets", "earliest")  # 検証目的であるため earliest に設定
    .option("includeHeaders", "true")
    .load()
)
src_df.printSchema()
src_df.display()
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- headers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: binary (nullable = true)

image.png

header 列からデータの取得

header 列から値を取得する関数はこちら
"""IoT Hub AMQP ヘッダー列を PySpark DataFrame に展開するユーティリティ.

IoT Hub から取得した AMQP 形式のヘッダーは Binary 型で格納される。
本モジュールでは以下を行う。

* 各バイト列を適切な型 (string / timestamp) にデコード
* 配列形式のヘッダー (`array<struct<key:string,value:binary>>`) を
  一括で構造化 (`struct`) する

Google Style Docstring を採用し、日本語で詳細を記述している。

Note:
    Databricks Runtime 14.3 / Spark 3.5.0 で動作確認。
"""
from __future__ import annotations

from functools import reduce
import operator as op
from typing import Dict

from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    col,
    conv,
    element_at,
    expr,
    map_from_entries,
    substring,
    timestamp_millis,
)

# --------------------------------------------------------------------------- #
# 単体デコーダー
# --------------------------------------------------------------------------- #

def decode_string(df: DataFrame, col_name: str, char_set: str = "UTF-8") -> DataFrame:
    """BINARY → STRING 変換を行う.

    Args:
        df: 対象 DataFrame.
        col_name: 変換対象の列名 (binary).
        char_set: デコードに用いる文字セット. デフォルトは ``"UTF-8"``.

    Returns:
        ``col_name`` が string 型へ置換された DataFrame.
    """
    return df.withColumn(col_name, expr(f"decode(`{col_name}`, '{char_set}')"))


def decode_string_with_str8utf8(
    df: DataFrame,
    col_name: str,
    prefix: int = 2,
    char_set: str = "UTF-8",
) -> DataFrame:
    """AMQP ``str8-utf8`` 形式 (prefix+string) をデコードする.

    Args:
        df: 対象 DataFrame.
        col_name: 変換対象の列名 (binary).
        prefix: 先頭の prefix バイト数. デフォルト ``2``.
        char_set: デコードに用いる文字セット.

    Returns:
        ``col_name`` が string 型へ置換された DataFrame.
    """
    sliced = expr(
        f"decode(substring(`{col_name}`, {prefix} + 1, length(`{col_name}`) - {prefix}), '{char_set}')"
    )
    return df.withColumn(col_name, sliced)


def decode_timestamp_str8_utf8(
    df: DataFrame,
    col_name: str,
    prefix: int = 2,
    char_set: str = "UTF-8",
    ts_fmt="yyyy-MM-dd\\'T\\'HH:mm:ss.SSSSSSSX",
) -> DataFrame:
    """AMQP ``timestamp:str8-utf8`` を Spark の ``timestamp`` へ変換する.

    流れ:

    1. ``str8-utf8`` を文字列へデコード
    2. ``TRY_TO_TIMESTAMP`` で timestamp へ変換

    Args:
        df: 対象 DataFrame.
        col_name: 変換対象の列名 (binary).
        prefix: 先頭の prefix バイト数.
        char_set: デコードに用いる文字セット.
        ts_fmt: Spark の日時書式 (``SimpleDateFormat``).

    Returns:
        ``col_name`` が timestamp 型へ置換された DataFrame.
    """
    df = decode_string_with_str8utf8(df, col_name, prefix, char_set)
    return df.withColumn(col_name, expr(f"TRY_TO_TIMESTAMP(`{col_name}`, '{ts_fmt}')"))


def decode_timestamp_with_ms64(df: DataFrame, col_name: str) -> DataFrame:
    """AMQP ``timestamp:ms64`` を Spark の ``timestamp`` へ変換する.

    AMQP ms64 は「符号付き 64bit big-endian 整数 (ミリ秒)」で表される。

    流れ:

    1. 先頭 1 バイトのフォーマットコードを除去
    2. 8 バイトを 16 進文字列へ変換
    3. 各バイトを ``long`` へ展開しビットシフトで合算
    4. ``timestamp_millis`` で ``timestamp`` へ変換

    Args:
        df: 対象 DataFrame.
        col_name: 変換対象の列名 (binary).

    Returns:
        ``col_name`` が timestamp 型へ置換された DataFrame.
    """
    # 1) 余計な 1 バイトを除去し 8 バイトを抽出
    df = df.withColumn(col_name, substring(col(col_name), 2, 8))
    # 2) BINARY → HEX string
    df = df.withColumn(col_name, expr(f"hex(`{col_name}`)"))

    # 3) HEX → signed 64bit integer
    byte_exprs = [
        conv(substring(col(col_name), i * 2 + 1, 2), 16, 10).cast("long") * (1 << (56 - 8 * i))
        for i in range(8)
    ]
    df = df.withColumn(col_name, reduce(op.add, byte_exprs))

    # 4) epoch milli → timestamp
    return df.withColumn(col_name, timestamp_millis(col(col_name)))


# --------------------------------------------------------------------------- #
# ヘッダー展開ユーティリティ
# --------------------------------------------------------------------------- #

def add_cols_from_iot_hub_headers(
    df: DataFrame,
    header_col_name: str = "headers",
    drop_temp_col: bool = True,
) -> DataFrame:
    """ヘッダー配列列を各フィールド列へ展開する.

    IoT Hub ではヘッダーが ``array<struct<key:string,value:binary>>`` として
    送出される。本関数は以下を実施する。

    * ``header_col_name`` を ``map<string,binary>`` へ変換
    * :pydata:`AMQP_HEADER_SPEC` に従い列を追加し型変換
        * ``string``             → :func:`decode_string`
        * ``string:str8-utf8``   → :func:`decode_string_with_str8utf8`
        * ``timestamp:str8-utf8``→ :func:`decode_timestamp_str8_utf8`
        * ``timestamp:ms64``     → :func:`decode_timestamp_with_ms64`

    Args:
        df: 対象 DataFrame.
        header_col_name: AMQP ヘッダーを格納する列名.
        drop_temp_col: 一時列を削除するかどうか.

    Returns:
        追加済み列を含む DataFrame.

    Raises:
        ValueError: :pydata:`AMQP_HEADER_SPEC` に未知の型が含まれる場合.
    """
    # 1) ARRAY<struct<key,value>> → MAP<key,value>
    temp_map_col = f"_amqp_header_map_{header_col_name}"
    df = df.withColumn(temp_map_col, map_from_entries(col(header_col_name)))

    # 2) 型仕様に従い列を追加
    for key, col_type in AMQP_HEADER_SPEC.items():
        df = df.withColumn(key, element_at(col(temp_map_col), key))

        col_type_lc = col_type.lower()
        if col_type_lc == "string":
            df = decode_string(df, key)
        elif col_type_lc == "string:str8-utf8":
            df = decode_string_with_str8utf8(df, key)
        elif col_type_lc == "timestamp:str8-utf8":
            df = decode_timestamp_str8_utf8(df, key)
        elif col_type_lc == "timestamp:ms64":
            df = decode_timestamp_with_ms64(df, key)
        else:
            raise ValueError(f"Unknown AMQP header type: {col_type}")

    if drop_temp_col:
        df = df.drop(temp_map_col)
    return df

image.png

AMQP_HEADER_SPEC = {
    "qiitaTest": "string:str8-utf8",
}

header_col_name = "headers"

df = add_cols_from_iot_hub_headers(src_df, header_col_name)

df.printSchema()
df.display()

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?