0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

manabianAdvent Calendar 2024

Day 23

Databricks にて Confluent から Protobuf 形式の Topic のデータ取得をスキーマレジストリを利用して実施する方法

Last updated at Posted at 2024-12-25

概要

Databricks にて Confluent から Protobuf 形式の Topic のデータ取得をスキーマレジストリを利用して実施する方法を紹介します。

本記事は、以下のシリーズの一部です。

image.png

引用元:Databricks と Confluent をつなぐ:データ連携の全体像を徹底解説 #Spark - Qiita

事前準備

1. Confluent にて Topic を作成

image.png

2. Confluent にて API キーを主塔

image.png

3. Confluent にて Schema Registry API キーを取得

image.png

データを取得する手順

1. Databricks 上で API キーと Schema Registry API キーを変数にセット

# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921aa.us-east-2.aws.confluent.cloud:9092"
sasl_username = "U4BML4OPRCaaaa"
sasl_password = "zzEjUzyUwYYJneNE240L7jgPVrHAtRBANo6wnnzCgKlGIJ9OxJqaJZWRvaaaaa"

# Schema Registry の接続情報
sr_url = "https://psrc-lAAAA.us-east-2.aws.confluent.cloud"
sr_api_key = "CMFFLEP4BCTMAAAA"
sr_api_secret = "9j/Bf/if0l5v7HocEJKd2xOt/z5Gr0I5M7lRpPQNslV0QOzMkLEQzgRgVBsAAAA"
schema_registry_options = {
    "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}",
}

image.png

2. Topic 名とサブジェクト名を変数にセット

topic_name = "protobu_user"
value_subject_name = "protobu_user-value"
schema_registry_options["schema.registry.subject"] = value_subject_name

image.png

3. Condluent からデータを取得

from pyspark.sql.functions import col, expr
from pyspark.sql.protobuf.functions import from_protobuf

# Confluent から読み込み
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    )
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("subscribe", topic_name)
    .option("startingOffsets", "earliest")
    .load()
)

# Key を Deserialize
key_des_df = df.withColumn(
    "key",
    col("Key").cast("string"),
)

# Magic Byte と Schema ID を取り除いた Value を Deserialize
decoded_df = key_des_df.withColumn(
    "value",
    from_protobuf(
        col("value"),
        options=schema_registry_options,
    ),
)

# Deserialize した Value のカラムを展開
tgt_cols = [
    "key",
    "value.*",
    "topic",
    "partition",
    "offset",
    "timestamp",
    "timestampType",
]
src_df = decoded_df.select(tgt_cols)

image.png

4. データを表示

src_df.display()

image.png

事後処理

Databricks ノートブックの実行を停止

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?