0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks のデータフレームにて Azure Event Hubs へ SAS 認証によりデータを書き込む方法

Last updated at Posted at 2025-05-14

概要

Databricks のデータフレームにて Azure Event Hubs へ SAS 認証によりデータを書き込む方法を紹介します。

前提条件

下記を構築します。

  • Databricks
  • Azure Event Hubs 名前空間(本手順ではkafka-test-01という名称)
  • Azure Event Hubs インスタンス (本手順ではdbx-to-eh-11という名称)

手順

1. Databricks ソースのテーブルと Volumes を作成

カレントカタログを設定します。

%sql
USE CATALOG manabian_test_01;

image.png

スキーマを作成します。

%sql
CREATE SCHEMA IF NOT EXISTS eh_test_01;

image.png

ソースのテーブルを作成します。

%sql
CREATE OR REPLACE TABLE eh_test_01.table_01
AS
SELECT
  1 AS id,
  'John' AS string_col,
  CAST('2025-01-01' AS TIMESTAMP) AS ingest_ts
  ;
SELECT * FROM eh_test_01.table_01;

image.png

Volume を作成し、初期化します。

%sql
CREATE VOLUME IF NOT EXISTS eh_test_01.volume_01;
volume_path = "/Volumes/manabian_test_01/eh_test_01/volume_01"
dbutils.fs.rm(volume_path, True)

image.png

2. Azure Event Hubs にて SAS を取得

Azure Event Hubs 名前空間(あるいは、 Azure Event Hubs インスタンス)にて、共有アクセス ポリシー -> + 追加を選択します。

image.png

dbx-to-eh-10というポリシー名を入力し、送信をチェックした上で作成を選択します。

image.png

作成したポリシーを表示し、プライマリ接続文字列をコピーします。

image.png

3. Databriks にて Azure Event Hubs へデータの書き込み

Azure Event Hubs に接続するための設定値をセットします。

# Azure Event Hubs に接続するための設定値をセット
eh_namespace_name = "kafka-test-01"  ## Azure Event Hubs の名前空間をセット
ehi_name = topic_name = "dbx-to-eh-01"  ## Azure Event Hubs のEvent Hubs のインスタンスをセット
connection_string = "Endpoint=sb://kafka-test-01.servicebus.windows.net/;SharedAccessKeyName=dbt-to-eh-01;SharedAccessKey=hnyXXXX="  ## SAS 接続文字列をセット

Azure Event Hubs からデータを取得できることを確認します。

# Databricks から Azure Event Hubs に設定する際の設定をセット
eh_sasl = (
    "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
    f'required username="$ConnectionString" password="{connection_string}";'
)
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
}

# Azure Event Hubs からデータを取得
from pyspark.sql.functions import col, expr, from_json

df = spark.readStream.table("eh_test_01.table_01")
df = df.selectExpr("to_json(struct(id, string_col, ingest_ts)) AS value")


_ = (
    df.writeStream.format("kafka")
    .options(**kafka_options)
    .option("topic", "hub01")
    .option("checkpointLocation", volume_path+"/table_01")
    .trigger(availableNow=True)
    .start()
)

image.png

Azure Event Hubs の Data Explorere にてデータが書き込まれていることを確認します。

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?