概要
Databricks のデータフレームにて Azure Event Hubs へ SAS 認証によりデータを書き込む方法を紹介します。
前提条件
下記を構築します。
- Databricks
- Azure Event Hubs 名前空間(本手順では
kafka-test-01
という名称) - Azure Event Hubs インスタンス (本手順では
dbx-to-eh-11
という名称)
手順
1. Databricks ソースのテーブルと Volumes を作成
カレントカタログを設定します。
%sql
USE CATALOG manabian_test_01;
スキーマを作成します。
%sql
CREATE SCHEMA IF NOT EXISTS eh_test_01;
ソースのテーブルを作成します。
%sql
CREATE OR REPLACE TABLE eh_test_01.table_01
AS
SELECT
1 AS id,
'John' AS string_col,
CAST('2025-01-01' AS TIMESTAMP) AS ingest_ts
;
SELECT * FROM eh_test_01.table_01;
Volume を作成し、初期化します。
%sql
CREATE VOLUME IF NOT EXISTS eh_test_01.volume_01;
volume_path = "/Volumes/manabian_test_01/eh_test_01/volume_01"
dbutils.fs.rm(volume_path, True)
2. Azure Event Hubs にて SAS を取得
Azure Event Hubs 名前空間(あるいは、 Azure Event Hubs インスタンス)にて、共有アクセス ポリシー
-> + 追加
を選択します。
dbx-to-eh-10
というポリシー名を入力し、送信
をチェックした上で作成
を選択します。
作成したポリシーを表示し、プライマリ接続文字列
をコピーします。
3. Databriks にて Azure Event Hubs へデータの書き込み
Azure Event Hubs に接続するための設定値をセットします。
# Azure Event Hubs に接続するための設定値をセット
eh_namespace_name = "kafka-test-01" ## Azure Event Hubs の名前空間をセット
ehi_name = topic_name = "dbx-to-eh-01" ## Azure Event Hubs のEvent Hubs のインスタンスをセット
connection_string = "Endpoint=sb://kafka-test-01.servicebus.windows.net/;SharedAccessKeyName=dbt-to-eh-01;SharedAccessKey=hnyXXXX=" ## SAS 接続文字列をセット
Azure Event Hubs からデータを取得できることを確認します。
# Databricks から Azure Event Hubs に設定する際の設定をセット
eh_sasl = (
"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
f'required username="$ConnectionString" password="{connection_string}";'
)
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
}
# Azure Event Hubs からデータを取得
from pyspark.sql.functions import col, expr, from_json
df = spark.readStream.table("eh_test_01.table_01")
df = df.selectExpr("to_json(struct(id, string_col, ingest_ts)) AS value")
_ = (
df.writeStream.format("kafka")
.options(**kafka_options)
.option("topic", "hub01")
.option("checkpointLocation", volume_path+"/table_01")
.trigger(availableNow=True)
.start()
)
Azure Event Hubs の Data Explorere にてデータが書き込まれていることを確認します。