概要
Databricks から Confluent に Avro 形式の Topic データを書き込む際にスキーマレジストリを利用して実施する方法を紹介します。
スキーマを指定せずに書き込みを実施できると想定していたのですが、 2024年 12 月 26 日時点ではスキーマを指定しないとエラーとなってしまいました。本記事では、スキーマを指定する手順となっておりますが、jsonFormatSchema
引数の指定が不要になる可能性もあります。
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 436.0 failed 4 times, most recent failure: Lost task 2.3 in stage 436.0 (TID 568) (10.139.64.10 executor 0): org.spark_project.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
本記事は、以下のシリーズの一部です。
引用元:Databricks と Confluent をつなぐ:データ連携の全体像を徹底解説 #Spark - Qiita
事前準備
1. Confluent にて Topic を作成
2. Confluent にて Topic にスキーマを設定
{
"connect.name": "ksql.users",
"fields": [
{
"name": "registertime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "regionid",
"type": "string"
},
{
"name": "gender",
"type": "string"
}
],
"name": "users",
"namespace": "ksql",
"type": "record"
}
3. Confluent にて API キーを主塔
4. Confluent にて Schema Registry API キーを取得
5. Databricks にてデータベースのオブジェクトを作成
%sql
CREATE CATALOG IF NOT EXISTS confluent_test;
CREATE SCHEMA IF NOT EXISTS confluent_test.test;
src_table_name = "confluent_test.test.src_table"
checkpoint_volume_name = "confluent_test.test.checkpoint_01"
checkpoint_volume_dir = f"/Volumes/confluent_test/test/checkpoint_01"
spark.sql(f"CREATE TABLE IF NOT EXISTS {src_table_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {checkpoint_volume_name}")
dbutils.fs.rm(checkpoint_volume_dir, True)
6. テーブルにデータを書きこみ
data = [
("User_1", 1507214855421, "Region_1", "OTHER"),
("User_2", 1516353515688, "Region_2", "OTHER"),
("User_3", 1511384261793, "Region_3", "FEMALE")
]
columns = ["userid", "registertime", "regionid", "gender"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").saveAsTable(src_table_name)
spark.table(src_table_name).display()
データを書き込む手順
1. Databricks 上で API キーと Schema Registry API キーを変数にセット
# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921aa.us-east-2.aws.confluent.cloud:9092"
sasl_username = "U4BML4OPRCaaaa"
sasl_password = "zzEjUzyUwYYJneNE240L7jgPVrHAtRBANo6wnnzCgKlGIJ9OxJqaJZWRvaaaaa"
# Schema Registry の接続情報
sr_url = "https://psrc-lAAAA.us-east-2.aws.confluent.cloud"
sr_api_key = "CMFFLEP4BCTMAAAA"
sr_api_secret = "9j/Bf/if0l5v7HocEJKd2xOt/z5Gr0I5M7lRpPQNslV0QOzMkLEQzgRgVBsAAAA"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
"confluent.schema.registry.basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}",
}
2. Databricks に Topic と Subject を変数にセット
topic_name = "avro_with_sr_01"
value_subject_name = topic_name + "-value"
3. スキーマを変数にセット
json_format_schema = """
{
"connect.name": "ksql.users",
"fields": [
{
"name": "registertime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "regionid",
"type": "string"
},
{
"name": "gender",
"type": "string"
}
],
"name": "users",
"namespace": "ksql",
"type": "record"
}
"""
4. Databricks にて Conflunet へ書き込みを実施
# 書き込み時に利用するチェックポイントのディレクトリを指定
checkpoint_dir = checkpoint_volume_dir + "/" + topic_name
from pyspark.sql.functions import col, struct, lit
from pyspark.sql.avro.functions import to_avro
src_df = spark.readStream.table(src_table_name)
# key 列を定義
src_df = src_df.withColumn("key", col("userid"))
src_df = src_df.withColumn(
"value",
struct("registertime", "userid", "regionid", "gender"),
)
src_df = src_df.withColumn(
"value",
to_avro(
data=col("value"),
options=schema_registry_options,
schemaRegistryAddress=sr_url,
subject=lit(value_subject_name),
jsonFormatSchema=json_format_schema,
),
)
# key 列と value 列のみを抽出
src_df = src_df.select("key", "value")
(
src_df.writeStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("kafka.security.protocol", "SASL_SSL")
.option(
"kafka.sasl.jaas.config",
f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
)
.option("kafka.sasl.mechanism", "PLAIN")
.option("topic", topic_name)
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.start()
)
5. Confluent 上でデータが書き込まれたことを確認
発生したエラー
1. to_avro
関数に対してjsonFormatSchema
引数を指定せずにsubject
引数を指定するとエラーが発生しました。
from pyspark.sql.functions import col, struct, lit
from pyspark.sql.avro.functions import to_avro
src_df = spark.readStream.table(src_table_name)
# key 列を定義
src_df = src_df.withColumn("key", col("userid"))
src_df = src_df.withColumn(
"value",
struct("registertime", "userid", "regionid", "gender"),
)
src_df = src_df.withColumn(
"value",
to_avro(
data=col("value"),
options=schema_registry_options,
schemaRegistryAddress=sr_url,
subject=lit(value_subject_name),
# jsonFormatSchema=json_format_schema,
),
)
# key 列と value 列のみを抽出
src_df = src_df.select("key", "value")
src_df.display()
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 436.0 failed 4 times, most recent failure: Lost task 2.3 in stage 436.0 (TID 568) (10.139.64.10 executor 0): org.spark_project.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
Caused by: org.spark_project.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403