概要
Databricks にて Confluent から Protobuf 形式の Topic のデータ取得をスキーマレジストリを利用して実施する方法を紹介します。
本記事は、以下のシリーズの一部です。
引用元:Databricks と Confluent をつなぐ:データ連携の全体像を徹底解説 #Spark - Qiita
事前準備
1. Confluent にて Topic を作成
2. Confluent にて API キーを主塔
3. Confluent にて Schema Registry API キーを取得
データを取得する手順
1. Databricks 上で API キーと Schema Registry API キーを変数にセット
# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921aa.us-east-2.aws.confluent.cloud:9092"
sasl_username = "U4BML4OPRCaaaa"
sasl_password = "zzEjUzyUwYYJneNE240L7jgPVrHAtRBANo6wnnzCgKlGIJ9OxJqaJZWRvaaaaa"
# Schema Registry の接続情報
sr_url = "https://psrc-lAAAA.us-east-2.aws.confluent.cloud"
sr_api_key = "CMFFLEP4BCTMAAAA"
sr_api_secret = "9j/Bf/if0l5v7HocEJKd2xOt/z5Gr0I5M7lRpPQNslV0QOzMkLEQzgRgVBsAAAA"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
"confluent.schema.registry.basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}",
}
2. Topic 名とサブジェクト名を変数にセット
topic_name = "protobu_user"
value_subject_name = "protobu_user-value"
schema_registry_options["schema.registry.subject"] = value_subject_name
3. Condluent からデータを取得
from pyspark.sql.functions import col, expr
from pyspark.sql.protobuf.functions import from_protobuf
# Confluent から読み込み
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("kafka.security.protocol", "SASL_SSL")
.option(
"kafka.sasl.jaas.config",
f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
)
.option("kafka.sasl.mechanism", "PLAIN")
.option("subscribe", topic_name)
.option("startingOffsets", "earliest")
.load()
)
# Key を Deserialize
key_des_df = df.withColumn(
"key",
col("Key").cast("string"),
)
# Magic Byte と Schema ID を取り除いた Value を Deserialize
decoded_df = key_des_df.withColumn(
"value",
from_protobuf(
col("value"),
options=schema_registry_options,
),
)
# Deserialize した Value のカラムを展開
tgt_cols = [
"key",
"value.*",
"topic",
"partition",
"offset",
"timestamp",
"timestampType",
]
src_df = decoded_df.select(tgt_cols)
4. データを表示
src_df.display()