0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks から Confluent に Protobuf 形式の Topic データを書き込む際にスキーマレジストリを利用して実施する方法

Last updated at Posted at 2024-12-26

概要

Databricks から Confluent に Protobuf 形式の Topic データを書き込む際にスキーマレジストリを利用して実施する方法を紹介します。

本記事は、以下のシリーズの一部です。

image.png

引用元:Databricks と Confluent をつなぐ:データ連携の全体像を徹底解説 #Spark - Qiita

事前準備

1. Confluent にて Topic を作成

image.png

2. Confluent にて Topic にスキーマを設定

syntax = "proto3";
package ksql;

message users {
  int64 registertime = 1;
  string userid = 2;
  string regionid = 3;
  string gender = 4;
}

image.png

3. Confluent にて API キーを主塔

image.png

4. Confluent にて Schema Registry API キーを取得

image.png

5. Databricks にてデータベースのオブジェクトを作成

%sql
CREATE CATALOG IF NOT EXISTS confluent_test;
CREATE SCHEMA IF NOT EXISTS confluent_test.test;
src_table_name = "confluent_test.test.src_table"
checkpoint_volume_name = "confluent_test.test.checkpoint_01"
checkpoint_volume_dir = f"/Volumes/confluent_test/test/checkpoint_01"
spark.sql(f"CREATE TABLE IF NOT EXISTS {src_table_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {checkpoint_volume_name}")
dbutils.fs.rm(checkpoint_volume_dir, True)

image.png

6. テーブルにデータを書きこみ

data = [
    ("User_1", 1507214855421, "Region_1", "OTHER"),
    ("User_2", 1516353515688, "Region_2", "OTHER"),
    ("User_3", 1511384261793, "Region_3", "FEMALE")
]

columns = ["userid", "registertime", "regionid", "gender"]

df = spark.createDataFrame(data, columns)

df.write.format("delta").mode("overwrite").saveAsTable(src_table_name)

spark.table(src_table_name).display()

image.png

データを書き込む手順

1. Databricks 上で API キーと Schema Registry API キーを変数にセット

# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921aa.us-east-2.aws.confluent.cloud:9092"
sasl_username = "U4BML4OPRCaaaa"
sasl_password = "zzEjUzyUwYYJneNE240L7jgPVrHAtRBANo6wnnzCgKlGIJ9OxJqaJZWRvaaaaa"

# Schema Registry の接続情報
sr_url = "https://psrc-l622j.us-east-2.aws.confluent.cloud"
sr_api_key = "RPHQKLYYQG3HIFJT"
sr_api_secret = "qRglNE56KGYuTTm2Go1ByfyLxBF9ZALztrmItyxUAGjwIUTiEQwaqAFOaaFAAAA"
schema_registry_options = {
    "schema.registry.subject" : value_subject_name,
    "schema.registry.address" : sr_url,
    "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}",
}

image.png

2. Databricks に Topic と Subject を変数にセット

topic_name = "protobuf_with_sr_01"
value_subject_name = topic_name + "-value"
schema_registry_options["schema.registry.subject"] = value_subject_name

image.png

3. Databricks にて Conflunet へ書き込みを実施

# 書き込み時に利用するチェックポイントのディレクトリを指定
checkpoint_dir = checkpoint_volume_dir + "/" + topic_name

from pyspark.sql.functions import col, struct, lit
from pyspark.sql.avro.functions import to_avro

src_df = spark.readStream.table(src_table_name)

# key 列を定義
src_df = src_df.withColumn("key", col("userid"))

src_df = src_df.withColumn(
    "value",
    struct("registertime", "userid", "regionid", "gender"),
)
src_df = src_df.withColumn(
    "value",
    to_avro(
        data=col("value"),
        options=schema_registry_options,
        schemaRegistryAddress=sr_url,
        # subject を指定するとエラーとなるためデフォルト戦略に任せる
        # subject=lit(value_subject_name),
    ),
)

# key 列と value 列のみを抽出
src_df = src_df.select("key", "value")

(
    src_df.writeStream.format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    )
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("topic", topic_name)
    .option("checkpointLocation", checkpoint_dir)
    .trigger(availableNow=True)
    .start()
)

image.png

4. Confluent 上でデータが書き込まれたことを確認

uploading...0

0
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?