概要
Databricks DLT にて Azure Event Hubs に対して json 形式でデータの書き込みと読み込みを実施する方法を共有します。
事前準備
Azure Event Hubs の SAS を取得
下記の記事を参考に SAS 認証により Databricks から Azure Event Hubs の SAS を取得してください。
- Databricks DLT にて Azure Event Hubs から SAS 認証によりデータを取得する方法 #dlt - Qiita
- Databricks のデータフレームにて Azure Event Hubs へ SAS 認証によりデータを書き込む方法 #AzureEventHubs - Qiita
Databricks にてデータベースオブジェクトを作成
カレントカタログを設定します。
%sql
USE CATALOG manabian_test_01;
スキーマを作成します。
%sql
CREATE SCHEMA IF NOT EXISTS eh_test_01;
ソースのテーブルを作成します。
%sql
CREATE OR REPLACE TABLE eh_test_01.table_01
AS
SELECT
1 AS id,
'John' AS string_col,
CAST('2025-01-01' AS TIMESTAMP) AS ingest_ts
;
SELECT * FROM eh_test_01.table_01;
Azure Event Hubs にてavro-test-02
という名称の Azure Event Hubs のインスタンスを作成
手順
1. Azure Event Hubs に対する書き込みの実施
Databricks DLT のパイプラインの下記のコードを記述して、Databricks DLT の実行します。
from dlt import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import from_avro
# Azure Event Hubs に接続するための設定値をセット
eh_namespace_name = "kafka-test-01" ## Azure Event Hubs の名前空間をセット
ehi_name = topic_name = "json-test-02" ## Azure Event Hubs のEvent Hubs のインスタンスをセット
connection_string = "Endpoint=sb://kafka-test-01.servicebus.windows.net/;SharedAccessKeyName=dbt-to-eh-01;SharedAccessKey=hnyXXXX=" ## SAS 接続文字列をセット
# Databricks から Azure Event Hubs に設定する際の設定をセット
eh_sasl = (
"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
f'required username="$ConnectionString" password="{connection_string}";'
)
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
}
# Azure Event Hubs へデータ書き込み
dbx_to_eh_12_options = kafka_options.copy()
dbx_to_eh_12_options["topic"] = topic_name
dlt.create_sink(name="eh_sink_01", format="kafka", options=dbx_to_eh_12_options)
@dlt.append_flow(name="kafka_sink_flow", target="eh_sink_01")
def kafka_sink_flow():
df = spark.readStream.table("eh_test_01.table_01")
df = df.selectExpr("to_avro(struct(id, string_col, ingest_ts)) AS value")
return df
Azure Event Hubs にてデータが書き込まれたことを確認します。
2. Databricks DLT の実行により Azure Event Hubs から読み込みの実施
DLT パイプラインに下記のコードを追記して、Databricks DLT の実行します。
# Azure Event Hubs からデータを取得
json_table_01 = "json_table_01"
schema = """
{
"name": "kafka_test_01.avro_0",
"type": "record",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "string_col",
"type": "string"
},
{
"name": "ingest_ts",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
}
]
}
"""
@dlt.table(
name = json_table_01,
)
def customers_bronze_ingest_flow():
df = (
spark.readStream.format("kafka")
.options(**kafka_options)
.option("subscribe",topic_name)
.option("startingOffsets", "earliest") # 検証目的であるため earliest に設定
.load()
)
df = df.withColumn(
"decoded_value",
from_avro(
col("value"),
schema,
),
)
return df
テーブルにデータが書きこまれたことを確認します。