0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks DLT にて Azure Event Hubs へ SAS 認証によりデータを書き込む方法

Last updated at Posted at 2025-05-14

概要

Databricks DLT にて Azure Event Hubs へ SAS 認証によりデータを書き込む方法を紹介します。

前提条件

下記を構築します。

  • Databricks
  • Databricks DLT のパイプライン
  • Azure Event Hubs 名前空間(本手順ではkafka-test-01という名称)
  • Azure Event Hubs インスタンス (本手順ではdbx-to-eh-13という名称)

手順

1. Databricks ソースのテーブルと Volumes を作成

カレントカタログを設定します。

%sql
USE CATALOG manabian_test_01;

image.png

スキーマを作成します。

%sql
CREATE SCHEMA IF NOT EXISTS eh_test_01;

image.png

ソースのテーブルを作成します。

%sql
CREATE OR REPLACE TABLE eh_test_01.table_01
AS
SELECT
  1 AS id,
  'John' AS string_col,
  CAST('2025-01-01' AS TIMESTAMP) AS ingest_ts
  ;
SELECT * FROM eh_test_01.table_01;

image.png

2. Azure Event Hubs にて SAS を取得

Azure Event Hubs 名前空間(あるいは、 Azure Event Hubs インスタンス)にて、共有アクセス ポリシー -> + 追加を選択します。

image.png

dbx-to-eh-10というポリシー名を入力し、送信をチェックした上で作成を選択します。

image.png

作成したポリシーを表示し、プライマリ接続文字列をコピーします。

image.png

3. Databriks にて Azure Event Hubs へデータの書き込み

Azure Event Hubs に接続するための設定値をセットし、 DLT パイプラインを実行します。

from dlt import *
from pyspark.sql.functions import *

# Azure Event Hubs に接続するための設定値をセット
eh_namespace_name = "kafka-test-01"  ## Azure Event Hubs の名前空間をセット
ehi_name = topic_name = "dbx-to-eh-13"  ## Azure Event Hubs のEvent Hubs のインスタンスをセット
connection_string = "Endpoint=sb://kafka-test-01.servicebus.windows.net/;SharedAccessKeyName=dbt-to-eh-01;SharedAccessKey=hnyXXXX="  ## SAS 接続文字列をセット

# Databricks から Azure Event Hubs に設定する際の設定をセット
eh_sasl = (
    "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
    f'required username="$ConnectionString" password="{connection_string}";'
)
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
}

# Azure Event Hubs へデータ書き込み
dbx_to_eh_12_options = kafka_options.copy()
dbx_to_eh_12_options["topic"] = topic_name
dlt.create_sink(name="eh_sink_01", format="kafka", options=dbx_to_eh_12_options)


@dlt.append_flow(name="kafka_sink_flow", target="eh_sink_01")
def kafka_sink_flow():
    df = spark.readStream.table("eh_test_01.table_01")
    df = df.selectExpr("to_json(struct(id, string_col, ingest_ts)) AS value")

    return df

image.png

Azure Event Hubs の Data Explorere にてデータが書き込まれていることを確認します。

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?