0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks から Confluent に Avro 形式の Topic データを書き込む際にスキーマレジストリを利用して実施する方法

Last updated at Posted at 2024-12-26

概要

Databricks から Confluent に Avro 形式の Topic データを書き込む際にスキーマレジストリを利用して実施する方法を紹介します。

スキーマを指定せずに書き込みを実施できると想定していたのですが、 2024年 12 月 26 日時点ではスキーマを指定しないとエラーとなってしまいました。本記事では、スキーマを指定する手順となっておりますが、jsonFormatSchema引数の指定が不要になる可能性もあります。

image.png

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 436.0 failed 4 times, most recent failure: Lost task 2.3 in stage 436.0 (TID 568) (10.139.64.10 executor 0): org.spark_project.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

image.png

本記事は、以下のシリーズの一部です。

image.png

引用元:Databricks と Confluent をつなぐ:データ連携の全体像を徹底解説 #Spark - Qiita

事前準備

1. Confluent にて Topic を作成

image.png

2. Confluent にて Topic にスキーマを設定

{
  "connect.name": "ksql.users",
  "fields": [
    {
      "name": "registertime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "regionid",
      "type": "string"
    },
    {
      "name": "gender",
      "type": "string"
    }
  ],
  "name": "users",
  "namespace": "ksql",
  "type": "record"
}

image.png

3. Confluent にて API キーを主塔

image.png

4. Confluent にて Schema Registry API キーを取得

image.png

5. Databricks にてデータベースのオブジェクトを作成

%sql
CREATE CATALOG IF NOT EXISTS confluent_test;
CREATE SCHEMA IF NOT EXISTS confluent_test.test;
src_table_name = "confluent_test.test.src_table"
checkpoint_volume_name = "confluent_test.test.checkpoint_01"
checkpoint_volume_dir = f"/Volumes/confluent_test/test/checkpoint_01"
spark.sql(f"CREATE TABLE IF NOT EXISTS {src_table_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {checkpoint_volume_name}")
dbutils.fs.rm(checkpoint_volume_dir, True)

image.png

6. テーブルにデータを書きこみ

data = [
    ("User_1", 1507214855421, "Region_1", "OTHER"),
    ("User_2", 1516353515688, "Region_2", "OTHER"),
    ("User_3", 1511384261793, "Region_3", "FEMALE")
]

columns = ["userid", "registertime", "regionid", "gender"]

df = spark.createDataFrame(data, columns)

df.write.format("delta").mode("overwrite").saveAsTable(src_table_name)

spark.table(src_table_name).display()

image.png

データを書き込む手順

1. Databricks 上で API キーと Schema Registry API キーを変数にセット

# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921aa.us-east-2.aws.confluent.cloud:9092"
sasl_username = "U4BML4OPRCaaaa"
sasl_password = "zzEjUzyUwYYJneNE240L7jgPVrHAtRBANo6wnnzCgKlGIJ9OxJqaJZWRvaaaaa"

# Schema Registry の接続情報
sr_url = "https://psrc-lAAAA.us-east-2.aws.confluent.cloud"
sr_api_key = "CMFFLEP4BCTMAAAA"
sr_api_secret = "9j/Bf/if0l5v7HocEJKd2xOt/z5Gr0I5M7lRpPQNslV0QOzMkLEQzgRgVBsAAAA"
schema_registry_options = {
    "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}",
}

image.png

2. Databricks に Topic と Subject を変数にセット

topic_name = "avro_with_sr_01"
value_subject_name = topic_name + "-value"

3. スキーマを変数にセット

json_format_schema = """
{
  "connect.name": "ksql.users",
  "fields": [
    {
      "name": "registertime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "regionid",
      "type": "string"
    },
    {
      "name": "gender",
      "type": "string"
    }
  ],
  "name": "users",
  "namespace": "ksql",
  "type": "record"
}
"""

image.png

4. Databricks にて Conflunet へ書き込みを実施

# 書き込み時に利用するチェックポイントのディレクトリを指定
checkpoint_dir = checkpoint_volume_dir + "/" + topic_name

image.png

from pyspark.sql.functions import col, struct, lit
from pyspark.sql.avro.functions import to_avro

src_df = spark.readStream.table(src_table_name)

# key 列を定義
src_df = src_df.withColumn("key", col("userid"))

src_df = src_df.withColumn(
    "value",
    struct("registertime", "userid", "regionid", "gender"),
)
src_df = src_df.withColumn(
    "value",
    to_avro(
        data=col("value"),
        options=schema_registry_options,
        schemaRegistryAddress=sr_url,
        subject=lit(value_subject_name),
        jsonFormatSchema=json_format_schema,
    ),
)
# key 列と value 列のみを抽出
src_df = src_df.select("key", "value")

image.png

(
    src_df.writeStream.format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    )
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("topic", topic_name)
    .option("checkpointLocation", checkpoint_dir)
    .trigger(availableNow=True)
    .start()
)

image.png

5. Confluent 上でデータが書き込まれたことを確認

image.png

発生したエラー

1. to_avro関数に対してjsonFormatSchema引数を指定せずにsubject引数を指定するとエラーが発生しました。

from pyspark.sql.functions import col, struct, lit
from pyspark.sql.avro.functions import to_avro

src_df = spark.readStream.table(src_table_name)

# key 列を定義
src_df = src_df.withColumn("key", col("userid"))

src_df = src_df.withColumn(
    "value",
    struct("registertime", "userid", "regionid", "gender"),
)
src_df = src_df.withColumn(
    "value",
    to_avro(
        data=col("value"),
        options=schema_registry_options,
        schemaRegistryAddress=sr_url,
        subject=lit(value_subject_name),
        # jsonFormatSchema=json_format_schema,
    ),
)

# key 列と value 列のみを抽出
src_df = src_df.select("key", "value")

src_df.display()
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 436.0 failed 4 times, most recent failure: Lost task 2.3 in stage 436.0 (TID 568) (10.139.64.10 executor 0): org.spark_project.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
Caused by: org.spark_project.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?