0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks DLT にて Azure Event Hubs に対して json 形式でデータの書き込みと読み込みを実施する方法

Posted at

概要

Databricks DLT にて Azure Event Hubs に対して json 形式でデータの書き込みと読み込みを実施する方法を共有します。

事前準備

Azure Event Hubs の SAS を取得

下記の記事を参考に SAS 認証により Databricks から Azure Event Hubs の SAS を取得してください。

Databricks にてデータベースオブジェクトを作成

カレントカタログを設定します。

%sql
USE CATALOG manabian_test_01;

スキーマを作成します。

%sql
CREATE SCHEMA IF NOT EXISTS eh_test_01;

ソースのテーブルを作成します。

%sql
CREATE OR REPLACE TABLE eh_test_01.table_01
AS
SELECT
  1 AS id,
  'John' AS string_col,
  CAST('2025-01-01' AS TIMESTAMP) AS ingest_ts
  ;
SELECT * FROM eh_test_01.table_01;

Azure Event Hubs にてavro-test-02という名称の Azure Event Hubs のインスタンスを作成

image.png

手順

1. Azure Event Hubs に対する書き込みの実施

Databricks DLT のパイプラインの下記のコードを記述して、Databricks DLT の実行します。

from dlt import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import from_avro

# Azure Event Hubs に接続するための設定値をセット
eh_namespace_name = "kafka-test-01"  ## Azure Event Hubs の名前空間をセット
ehi_name = topic_name = "json-test-02"  ## Azure Event Hubs のEvent Hubs のインスタンスをセット
connection_string = "Endpoint=sb://kafka-test-01.servicebus.windows.net/;SharedAccessKeyName=dbt-to-eh-01;SharedAccessKey=hnyXXXX="  ## SAS 接続文字列をセット

# Databricks から Azure Event Hubs に設定する際の設定をセット
eh_sasl = (
    "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule "
    f'required username="$ConnectionString" password="{connection_string}";'
)
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
}

# Azure Event Hubs へデータ書き込み
dbx_to_eh_12_options = kafka_options.copy()
dbx_to_eh_12_options["topic"] = topic_name
dlt.create_sink(name="eh_sink_01", format="kafka", options=dbx_to_eh_12_options)


@dlt.append_flow(name="kafka_sink_flow", target="eh_sink_01")
def kafka_sink_flow():
    df = spark.readStream.table("eh_test_01.table_01")
    df = df.selectExpr("to_avro(struct(id, string_col, ingest_ts)) AS value")

    return df

image.png

Azure Event Hubs にてデータが書き込まれたことを確認します。

image.png

2. Databricks DLT の実行により Azure Event Hubs から読み込みの実施

DLT パイプラインに下記のコードを追記して、Databricks DLT の実行します。

# Azure Event Hubs からデータを取得
json_table_01 = "json_table_01"
schema = """
{
  "name": "kafka_test_01.avro_0",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "string_col",
      "type": "string"
    },
    {
      "name": "ingest_ts",
      "type": {
        "type": "long",
        "logicalType": "timestamp-micros"
      }
    }
  ]
}
"""
@dlt.table(
    name = json_table_01,
)
def customers_bronze_ingest_flow():
    df = (
        spark.readStream.format("kafka")
        .options(**kafka_options)
        .option("subscribe",topic_name)
        .option("startingOffsets", "earliest")  # 検証目的であるため earliest に設定
        .load()
    )
    df = df.withColumn(
        "decoded_value",
        from_avro(
            col("value"),
            schema,
        ),
    )
    return df

image.png

テーブルにデータが書きこまれたことを確認します。

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?