0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks のデータフレームにて Microsoft Fabric における EventStream の Destination のカスタムエンドポイント から SAS 認証によりデータを取得する方法

Posted at

概要

Databricks のデータフレームにて Microsoft Fabric (Fabric) における EventStream の Destination のカスタムエンドポイント から SAS 認証によりデータを取得する方法を共有します。

前提条件

下記を構築します。

  • Databricks
  • Microsoft Fabric

事前準備

1. Fabric のワークスペースにて Destination のカスタムエンドポイントを持つEventStream を作成し公開

image.png

image.png

image.png

image.png

image.png

image.png

image.png

手順

1. Fabric にて SAS を取得

作成した宛先 -> Kafka -> SASキー認証を選択し 、下記の値を取得します。

  • サーバー名(ブーストトラップサーバーのxxx..servicebus.windows.net:9093におけるxxxの値)
  • トピック名
  • 接続文字列 - 主キー

image.png

2. Databriks にて Azure Event Hubs からデータを取得

Eventstream カスタムエンドポイント に接続するための設定値をセットします。

# Eventstream カスタムエンドポイント に接続するための設定値をセット
esc_namespace_name = "esehsgi702j38of92v5kv7"  ## Eventstream カスタムエンドポイント の名前をセット
topic_name = "es_b5502d41-18b0-469c-87b1-a32b38e3a700"  ## Eventstream カスタムエンドポイントのトピックをセット
connection_string = "Endpoint=sb://esehsgi702j38of92v5kv7.servicebus.windows.net/;SharedAccessKeyName=key_70d0a645-5784-4d26-9851-0370c4ded6a6;SharedAccessKey=zMIkewSb2=;EntityPath=es_913f810e-0a03-4cba-98f2-e385d1bc9e21" ## 接続文字列 

image.png

Eventstream カスタムエンドポイント からデータを取得できることを確認します。

# Databricks から Eventstream カスタムエンドポイント に設定する際の設定をセット
eh_sasl = (
    "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
    f'required username="$ConnectionString" password="{connection_string}";'
)
bootstrap_servers = f"{esc_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
}

# Eventstream カスタムエンドポイント からデータを取得
from pyspark.sql.functions import col, expr, from_json

df = (
    spark.readStream.format("kafka")
    .options(**kafka_options)
    .option("subscribe", topic_name)
    .option("startingOffsets", "earliest")  # 検証目的であるため earliest に設定
    .load()
)
df = df.withColumn(
    "decoded_value",
    col("value").cast("string"),
)

df.display()

image.png

EventStream の Destination のカスタムエンドポイントでは、外部からの書き込みをサポートしていないようです。

image.png

org.apache.spark.SparkException: Job aborted due to stage failure: kafkashaded.org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [es_913f810e-0a03-4cba-98f2-e385d1bc9e21]

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?