0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks のデータフレームにて Microsoft Fabric における EventStream の Source のカスタムエンドポイント から SAS 認証によりデータを書き込む方法

Posted at

概要

Databricks のデータフレームにて Microsoft Fabric (Fabric) における EventStream の Source のカスタムエンドポイント から SAS 認証によりデータを書き込む方法を共有します。

前提条件

下記を構築します。

  • Databricks
  • Microsoft Fabric

事前準備

1. Fabric のワークスペースにて Source のカスタムエンドポイントを持つEventStream を作成し公開

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

手順

1. Databricks ソースのテーブルと Volumes を作成

カレントカタログを設定します。

%sql
USE CATALOG manabian_test_01;

image.png

スキーマを作成します。

%sql
CREATE SCHEMA IF NOT EXISTS eh_test_01;

image.png

ソースのテーブルを作成します。

%sql
CREATE OR REPLACE TABLE eh_test_01.table_01
AS
SELECT
  1 AS id,
  'John' AS string_col,
  CAST('2025-01-01' AS TIMESTAMP) AS ingest_ts
  ;
SELECT * FROM eh_test_01.table_01;

image.png

Volume を作成し、初期化します。

%sql
CREATE VOLUME IF NOT EXISTS eh_test_01.volume_01;
volume_path = "/Volumes/manabian_test_01/eh_test_01/volume_01"
dbutils.fs.rm(volume_path, True)

image.png

2. Fabric にて SAS を取得

作成したソース -> Kafka -> SASキー認証を選択し 、下記の値を取得します。

  • サーバー名(ブーストトラップサーバーのxxx..servicebus.windows.net:9093におけるxxxの値)
  • トピック名
  • 接続文字列 - 主キー

image.png

2. Databriks にて Azure Event Hubs からデータを取得

Eventstream カスタムエンドポイント に接続するための設定値をセットします。

# Eventstream カスタムエンドポイント に接続するための設定値をセット
esc_namespace_name = "esehsgi702j38of92v5kv7"  ## Eventstream カスタムエンドポイント の名前をセット
topic_name = "es_26f0a637-fa91-45c6-8061-5f832de539bf"  ## Eventstream カスタムエンドポイントのトピックをセット
connection_string = "Endpoint=sb://esehsgi702j38of92v5kv7.servicebus.windows.net/;SharedAccessKeyName=key_70d0a645-5784-4d26-9851-0370c4ded6a6;SharedAccessKey=zMIkewSb2=;EntityPath=es_26f0a637-fa91-45c6-8061-5f832de539bf" ## 接続文字列 

image.png

Eventstream カスタムエンドポイントに書き込みを実施します。

# Databricks から Eventstream カスタムエンドポイント に設定する際の設定をセット
eh_sasl = (
    "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
    f'required username="$ConnectionString" password="{connection_string}";'
)
bootstrap_servers = f"{esc_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
}


# Azure Event Hubs へデータを書き込む
from pyspark.sql.functions import col, expr, from_json

df = spark.readStream.table("eh_test_01.table_01")
df = df.selectExpr("to_json(struct(id, string_col, ingest_ts)) AS value")


_ = (
    df.writeStream.format("kafka")
    .options(**kafka_options)
    .option("topic", topic_name)
    .option("checkpointLocation", volume_path+"/table_01")
    .trigger(availableNow=True)
    .start()
)

image.png

Fabric の Eventstream にて書き込まれたことを確認します。

image.png

EventStream の Source のカスタムエンドポイントでは、外部からの読み込みをサポートしていないようです。

java.util.concurrent.ExecutionException: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Call(callName=listOffsets on broker 0, deadlineMs=1747366497859, tries=1, nextAllowedTryMs=1747366497960) timed out at 1747366497860 after 1 attempt(s)

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?