概要
Databricks から Confluent に Avro 形式の Topic データを書き込む際に手動でスキーマを指定して実施する方法を紹介します。
本記事は、以下のシリーズの一部です。
引用元:Databricks と Confluent をつなぐ:データ連携の全体像を徹底解説 #Spark - Qiita
事前準備
1. Confluent にて Topic を作成
2. Confluent にて Topic にスキーマを設定
{
"connect.name": "ksql.users",
"fields": [
{
"name": "registertime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "regionid",
"type": "string"
},
{
"name": "gender",
"type": "string"
}
],
"name": "users",
"namespace": "ksql",
"type": "record"
}
3. Confluent にて作成したスキーマ(Subject)の Schema ID の値を取得
4. Confluent にて API キーを主塔
5. Confluent にて Schema Registry API キーを取得
6. Databricks にてデータベースのオブジェクトを作成
%sql
CREATE CATALOG IF NOT EXISTS confluent_test;
CREATE SCHEMA IF NOT EXISTS confluent_test.test;
src_table_name = "confluent_test.test.src_table"
checkpoint_volume_name = "confluent_test.test.checkpoint_01"
checkpoint_volume_dir = f"/Volumes/confluent_test/test/checkpoint_01"
spark.sql(f"CREATE TABLE IF NOT EXISTS {src_table_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {checkpoint_volume_name}")
dbutils.fs.rm(checkpoint_volume_dir, True)
7. テーブルにデータを書きこみ
data = [
("User_1", 1507214855421, "Region_1", "OTHER"),
("User_2", 1516353515688, "Region_2", "OTHER"),
("User_3", 1511384261793, "Region_3", "FEMALE")
]
columns = ["userid", "registertime", "regionid", "gender"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").saveAsTable(src_table_name)
spark.table(src_table_name).display()
データを書き込む手順
1. Databricks 上で API キーと Schema Registry API キーを変数にセット
# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921aa.us-east-2.aws.confluent.cloud:9092"
sasl_username = "U4BML4OPRCaaaa"
sasl_password = "zzEjUzyUwYYJneNE240L7jgPVrHAtRBANo6wnnzCgKlGIJ9OxJqaJZWRvaaaaa"
2. Databricks にて Schema ID を変数にセット
schema_id_num = 100001
3. Databricks に Topic とスキーマを変数にセット
topic_name = "avro_without_sr_01"
schema = """
{
"connect.name": "ksql.users",
"fields": [
{
"name": "registertime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "regionid",
"type": "string"
},
{
"name": "gender",
"type": "string"
}
],
"name": "users",
"namespace": "ksql",
"type": "record"
}
"""
4. Databricks にてWire Header を変数にセット
# Confluent Wire Header の定義
magic_byte_num = 0
wire_header = bytes([magic_byte_num])
wire_header += schema_id_num.to_bytes(4, byteorder='big', signed=False)
print(wire_header)
5. Databricks にて Conflunet へ書き込みを実施
# 書き込み時に利用するチェックポイントのディレクトリを指定
checkpoint_dir = checkpoint_volume_dir + "/" + topic_name
from pyspark.sql.functions import col, struct, lit ,concat
from pyspark.sql.avro.functions import to_avro
src_df = spark.readStream.table(src_table_name)
# key 列を定義
src_df = src_df.withColumn("key", col("userid"))
# magic-byte, schema-id, message-indexes, Avro のデータをもつ value 列を定義
src_df = src_df.withColumn("value", struct("registertime", "userid", "regionid", "gender"))
src_df = src_df.withColumn("value", to_avro(col("value"), schema))
# value 列に Confluent Protobuf Wire Header を追加
src_df = src_df.withColumn("add_bytes", lit(wire_header))
src_df = src_df.withColumn("value", concat(col("add_bytes"), col("value")))
# key 列と value 列のみを抽出
src_df = src_df.select("key", "value")
(
src_df.writeStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("kafka.security.protocol", "SASL_SSL")
.option(
"kafka.sasl.jaas.config",
f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
)
.option("kafka.sasl.mechanism", "PLAIN")
.option("topic", topic_name)
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.start()
)