0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks のデータフレームにて Azure Event Hubs に対して json 形式でデータの書き込みと読み込みを実施する方法

Posted at

概要

Databricks のデータフレームにて Azure Event Hubs に対して json 形式でデータの書き込みと読み込みを実施する方法を共有します。

事前準備

Azure Event Hubs の SAS を取得

下記の記事を参考に SAS 認証により Databricks から Azure Event Hubs の SAS を取得してください。

Databricks にてデータベースオブジェクトを作成

カレントカタログを設定します。

%sql
USE CATALOG manabian_test_01;

スキーマを作成します。

%sql
CREATE SCHEMA IF NOT EXISTS eh_test_01;

ソースのテーブルを作成します。

%sql
CREATE OR REPLACE TABLE eh_test_01.table_01
AS
SELECT
  1 AS id,
  'John' AS string_col,
  CAST('2025-01-01' AS TIMESTAMP) AS ingest_ts
  ;
SELECT * FROM eh_test_01.table_01;

処理時のチェックポイントに利用する Volume を作成します。

%sql
CREATE VOLUME IF NOT EXISTS eh_test_01.volume_01;
volume_path = "/Volumes/manabian_test_01/eh_test_01/volume_01"
dbutils.fs.rm(volume_path, True)

Azure Event Hubs にてjson-test-01という名称の Azure Event Hubs のインスタンスを作成

image.png

手順

1. Databricks にて Azure Event Hubs に対する

# Azure Event Hubs に接続するための設定値をセット
eh_namespace_name = "kafka-test-01"  ## Azure Event Hubs の名前空間をセット
ehi_name = topic_name = "dbx-to-eh-01"  ## Azure Event Hubs のEvent Hubs のインスタンスをセット
connection_string = "Endpoint=sb://kafka-test-01.servicebus.windows.net/;SharedAccessKeyName=dbt-to-eh-01;SharedAccessKey=hnyXXXX="  ## SAS 接続文字列をセット
# Databricks から Azure Event Hubs に設定する際の設定をセット
eh_sasl = (
    "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
    f'required username="$ConnectionString" password="{connection_string}";'
)
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
}

image.png

2. Databricks から Azure Event Hubs に対する書き込みの実施

# Azure Event Hubs へデータを書き込む
from pyspark.sql.functions import col, expr, from_json

df = spark.readStream.table("eh_test_01.table_01")
df = df.selectExpr("to_json(struct(id, string_col, ingest_ts)) AS value")


_ = (
    df.writeStream.format("kafka")
    .options(**kafka_options)
    .option("topic", topic_name)
    .option("checkpointLocation", volume_path+"/table_01")
    .trigger(availableNow=True)
    .start()
)

image.png

Azure Event Hubs の画面にてデータが書きこまれたことを確認します。

image.png

3. Databricks から Azure Event Hubs から読み込みの実施

# Azure Event Hubs からデータを取得
from pyspark.sql.functions import col, expr, from_json

df = (
    spark.readStream.format("kafka")
    .options(**kafka_options)
    .option("subscribe", topic_name)
    .option("startingOffsets", "earliest")  # 検証目的であるため earliest に設定
    .load()
)
df = df.withColumn(
    "decoded_value",
    col("value").cast("string"),
)
df.display()

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?