概要
Databricks のデータフレームにて Microsoft Fabric (Fabric) における EventStream の Source のカスタムエンドポイント から SAS 認証によりデータを書き込む方法を共有します。
前提条件
下記を構築します。
- Databricks
- Microsoft Fabric
事前準備
1. Fabric のワークスペースにて Source のカスタムエンドポイントを持つEventStream を作成し公開
手順
1. Databricks ソースのテーブルと Volumes を作成
カレントカタログを設定します。
%sql
USE CATALOG manabian_test_01;
スキーマを作成します。
%sql
CREATE SCHEMA IF NOT EXISTS eh_test_01;
ソースのテーブルを作成します。
%sql
CREATE OR REPLACE TABLE eh_test_01.table_01
AS
SELECT
1 AS id,
'John' AS string_col,
CAST('2025-01-01' AS TIMESTAMP) AS ingest_ts
;
SELECT * FROM eh_test_01.table_01;
Volume を作成し、初期化します。
%sql
CREATE VOLUME IF NOT EXISTS eh_test_01.volume_01;
volume_path = "/Volumes/manabian_test_01/eh_test_01/volume_01"
dbutils.fs.rm(volume_path, True)
2. Fabric にて SAS を取得
作成したソース -> Kafka
-> SASキー認証
を選択し 、下記の値を取得します。
- サーバー名(ブーストトラップサーバーの
xxx..servicebus.windows.net:9093
におけるxxx
の値) - トピック名
- 接続文字列 - 主キー
2. Databriks にて Azure Event Hubs からデータを取得
Eventstream カスタムエンドポイント に接続するための設定値をセットします。
# Eventstream カスタムエンドポイント に接続するための設定値をセット
esc_namespace_name = "esehsgi702j38of92v5kv7" ## Eventstream カスタムエンドポイント の名前をセット
topic_name = "es_26f0a637-fa91-45c6-8061-5f832de539bf" ## Eventstream カスタムエンドポイントのトピックをセット
connection_string = "Endpoint=sb://esehsgi702j38of92v5kv7.servicebus.windows.net/;SharedAccessKeyName=key_70d0a645-5784-4d26-9851-0370c4ded6a6;SharedAccessKey=zMIkewSb2=;EntityPath=es_26f0a637-fa91-45c6-8061-5f832de539bf" ## 接続文字列
Eventstream カスタムエンドポイントに書き込みを実施します。
# Databricks から Eventstream カスタムエンドポイント に設定する際の設定をセット
eh_sasl = (
"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
f'required username="$ConnectionString" password="{connection_string}";'
)
bootstrap_servers = f"{esc_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
}
# Azure Event Hubs へデータを書き込む
from pyspark.sql.functions import col, expr, from_json
df = spark.readStream.table("eh_test_01.table_01")
df = df.selectExpr("to_json(struct(id, string_col, ingest_ts)) AS value")
_ = (
df.writeStream.format("kafka")
.options(**kafka_options)
.option("topic", topic_name)
.option("checkpointLocation", volume_path+"/table_01")
.trigger(availableNow=True)
.start()
)
Fabric の Eventstream にて書き込まれたことを確認します。
EventStream の Source のカスタムエンドポイントでは、外部からの読み込みをサポートしていないようです。
java.util.concurrent.ExecutionException: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Call(callName=listOffsets on broker 0, deadlineMs=1747366497859, tries=1, nextAllowedTryMs=1747366497960) timed out at 1747366497860 after 1 attempt(s)