概要
Azure IoT Hub から Azure Event Hubs にメッセージルーティンした際のエンリッチの値を Databricks で参照する方法を紹介します。header
列にて連携されるようです。本記事ではでエンリッチ機能にフォーカスした内容となっており、下記記事内容を前提としています。
メッセージルーティンの実施
エンリッチの設定を実施
device-to-cloud メッセージ(テレメトリー)の送信
Databricks におけるデータの読み込み
Azure Event Hubs から Kafka エンドポイントによりデータを取得
Azure Event Hubs からデータを取得すると、header
列に含まれていることを確認できます。
eh_namespace_name = "kafka-test-01"
eh_connection_string = "Endpoint=sb://kafka-test-01.servicebus.windows.net/;SharedAccessKeyName=qiita-sas-01;SharedAccessKey=fiKmMqJtmn2dGAeV6oj1dWWxxxx="
topic_name = "iot-message-01"
# Databricks から Azure Event Hubs に設定する際の設定をセット
eh_sasl = (
"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
f'required username="$ConnectionString" password="{eh_connection_string}";'
)
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
}
# Azure Event Hubs からデータを取得
from pyspark.sql.functions import col
src_df = (
spark.readStream.format("kafka")
.options(**kafka_options)
.option("subscribe", topic_name)
.option("startingOffsets", "earliest") # 検証目的であるため earliest に設定
.option("includeHeaders", "true")
.load()
)
src_df.printSchema()
src_df.display()
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
|-- headers: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- value: binary (nullable = true)
header
列からデータの取得
header 列から値を取得する関数はこちら
"""IoT Hub AMQP ヘッダー列を PySpark DataFrame に展開するユーティリティ.
IoT Hub から取得した AMQP 形式のヘッダーは Binary 型で格納される。
本モジュールでは以下を行う。
* 各バイト列を適切な型 (string / timestamp) にデコード
* 配列形式のヘッダー (`array<struct<key:string,value:binary>>`) を
一括で構造化 (`struct`) する
Google Style Docstring を採用し、日本語で詳細を記述している。
Note:
Databricks Runtime 14.3 / Spark 3.5.0 で動作確認。
"""
from __future__ import annotations
from functools import reduce
import operator as op
from typing import Dict
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
col,
conv,
element_at,
expr,
map_from_entries,
substring,
timestamp_millis,
)
# --------------------------------------------------------------------------- #
# 単体デコーダー
# --------------------------------------------------------------------------- #
def decode_string(df: DataFrame, col_name: str, char_set: str = "UTF-8") -> DataFrame:
"""BINARY → STRING 変換を行う.
Args:
df: 対象 DataFrame.
col_name: 変換対象の列名 (binary).
char_set: デコードに用いる文字セット. デフォルトは ``"UTF-8"``.
Returns:
``col_name`` が string 型へ置換された DataFrame.
"""
return df.withColumn(col_name, expr(f"decode(`{col_name}`, '{char_set}')"))
def decode_string_with_str8utf8(
df: DataFrame,
col_name: str,
prefix: int = 2,
char_set: str = "UTF-8",
) -> DataFrame:
"""AMQP ``str8-utf8`` 形式 (prefix+string) をデコードする.
Args:
df: 対象 DataFrame.
col_name: 変換対象の列名 (binary).
prefix: 先頭の prefix バイト数. デフォルト ``2``.
char_set: デコードに用いる文字セット.
Returns:
``col_name`` が string 型へ置換された DataFrame.
"""
sliced = expr(
f"decode(substring(`{col_name}`, {prefix} + 1, length(`{col_name}`) - {prefix}), '{char_set}')"
)
return df.withColumn(col_name, sliced)
def decode_timestamp_str8_utf8(
df: DataFrame,
col_name: str,
prefix: int = 2,
char_set: str = "UTF-8",
ts_fmt="yyyy-MM-dd\\'T\\'HH:mm:ss.SSSSSSSX",
) -> DataFrame:
"""AMQP ``timestamp:str8-utf8`` を Spark の ``timestamp`` へ変換する.
流れ:
1. ``str8-utf8`` を文字列へデコード
2. ``TRY_TO_TIMESTAMP`` で timestamp へ変換
Args:
df: 対象 DataFrame.
col_name: 変換対象の列名 (binary).
prefix: 先頭の prefix バイト数.
char_set: デコードに用いる文字セット.
ts_fmt: Spark の日時書式 (``SimpleDateFormat``).
Returns:
``col_name`` が timestamp 型へ置換された DataFrame.
"""
df = decode_string_with_str8utf8(df, col_name, prefix, char_set)
return df.withColumn(col_name, expr(f"TRY_TO_TIMESTAMP(`{col_name}`, '{ts_fmt}')"))
def decode_timestamp_with_ms64(df: DataFrame, col_name: str) -> DataFrame:
"""AMQP ``timestamp:ms64`` を Spark の ``timestamp`` へ変換する.
AMQP ms64 は「符号付き 64bit big-endian 整数 (ミリ秒)」で表される。
流れ:
1. 先頭 1 バイトのフォーマットコードを除去
2. 8 バイトを 16 進文字列へ変換
3. 各バイトを ``long`` へ展開しビットシフトで合算
4. ``timestamp_millis`` で ``timestamp`` へ変換
Args:
df: 対象 DataFrame.
col_name: 変換対象の列名 (binary).
Returns:
``col_name`` が timestamp 型へ置換された DataFrame.
"""
# 1) 余計な 1 バイトを除去し 8 バイトを抽出
df = df.withColumn(col_name, substring(col(col_name), 2, 8))
# 2) BINARY → HEX string
df = df.withColumn(col_name, expr(f"hex(`{col_name}`)"))
# 3) HEX → signed 64bit integer
byte_exprs = [
conv(substring(col(col_name), i * 2 + 1, 2), 16, 10).cast("long") * (1 << (56 - 8 * i))
for i in range(8)
]
df = df.withColumn(col_name, reduce(op.add, byte_exprs))
# 4) epoch milli → timestamp
return df.withColumn(col_name, timestamp_millis(col(col_name)))
# --------------------------------------------------------------------------- #
# ヘッダー展開ユーティリティ
# --------------------------------------------------------------------------- #
def add_cols_from_iot_hub_headers(
df: DataFrame,
header_col_name: str = "headers",
drop_temp_col: bool = True,
) -> DataFrame:
"""ヘッダー配列列を各フィールド列へ展開する.
IoT Hub ではヘッダーが ``array<struct<key:string,value:binary>>`` として
送出される。本関数は以下を実施する。
* ``header_col_name`` を ``map<string,binary>`` へ変換
* :pydata:`AMQP_HEADER_SPEC` に従い列を追加し型変換
* ``string`` → :func:`decode_string`
* ``string:str8-utf8`` → :func:`decode_string_with_str8utf8`
* ``timestamp:str8-utf8``→ :func:`decode_timestamp_str8_utf8`
* ``timestamp:ms64`` → :func:`decode_timestamp_with_ms64`
Args:
df: 対象 DataFrame.
header_col_name: AMQP ヘッダーを格納する列名.
drop_temp_col: 一時列を削除するかどうか.
Returns:
追加済み列を含む DataFrame.
Raises:
ValueError: :pydata:`AMQP_HEADER_SPEC` に未知の型が含まれる場合.
"""
# 1) ARRAY<struct<key,value>> → MAP<key,value>
temp_map_col = f"_amqp_header_map_{header_col_name}"
df = df.withColumn(temp_map_col, map_from_entries(col(header_col_name)))
# 2) 型仕様に従い列を追加
for key, col_type in AMQP_HEADER_SPEC.items():
df = df.withColumn(key, element_at(col(temp_map_col), key))
col_type_lc = col_type.lower()
if col_type_lc == "string":
df = decode_string(df, key)
elif col_type_lc == "string:str8-utf8":
df = decode_string_with_str8utf8(df, key)
elif col_type_lc == "timestamp:str8-utf8":
df = decode_timestamp_str8_utf8(df, key)
elif col_type_lc == "timestamp:ms64":
df = decode_timestamp_with_ms64(df, key)
else:
raise ValueError(f"Unknown AMQP header type: {col_type}")
if drop_temp_col:
df = df.drop(temp_map_col)
return df
AMQP_HEADER_SPEC = {
"qiitaTest": "string:str8-utf8",
}
header_col_name = "headers"
df = add_cols_from_iot_hub_headers(src_df, header_col_name)
df.printSchema()
df.display()