概要
Databricks のデータフレームにて Microsoft Fabric (Fabric) における EventStream の Destination のカスタムエンドポイント から SAS 認証によりデータを取得する方法を共有します。
前提条件
下記を構築します。
- Databricks
- Microsoft Fabric
事前準備
1. Fabric のワークスペースにて Destination のカスタムエンドポイントを持つEventStream を作成し公開
手順
1. Fabric にて SAS を取得
作成した宛先 -> Kafka
-> SASキー認証
を選択し 、下記の値を取得します。
- サーバー名(ブーストトラップサーバーの
xxx..servicebus.windows.net:9093
におけるxxx
の値) - トピック名
- 接続文字列 - 主キー
2. Databriks にて Azure Event Hubs からデータを取得
Eventstream カスタムエンドポイント に接続するための設定値をセットします。
# Eventstream カスタムエンドポイント に接続するための設定値をセット
esc_namespace_name = "esehsgi702j38of92v5kv7" ## Eventstream カスタムエンドポイント の名前をセット
topic_name = "es_b5502d41-18b0-469c-87b1-a32b38e3a700" ## Eventstream カスタムエンドポイントのトピックをセット
connection_string = "Endpoint=sb://esehsgi702j38of92v5kv7.servicebus.windows.net/;SharedAccessKeyName=key_70d0a645-5784-4d26-9851-0370c4ded6a6;SharedAccessKey=zMIkewSb2=;EntityPath=es_913f810e-0a03-4cba-98f2-e385d1bc9e21" ## 接続文字列
Eventstream カスタムエンドポイント からデータを取得できることを確認します。
# Databricks から Eventstream カスタムエンドポイント に設定する際の設定をセット
eh_sasl = (
"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
f'required username="$ConnectionString" password="{connection_string}";'
)
bootstrap_servers = f"{esc_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
}
# Eventstream カスタムエンドポイント からデータを取得
from pyspark.sql.functions import col, expr, from_json
df = (
spark.readStream.format("kafka")
.options(**kafka_options)
.option("subscribe", topic_name)
.option("startingOffsets", "earliest") # 検証目的であるため earliest に設定
.load()
)
df = df.withColumn(
"decoded_value",
col("value").cast("string"),
)
df.display()