概要
Databricks から Confluent に Protobuf 形式の Topic データを書き込む際に手動でスキーマを指定して実施する方法を紹介します。
本記事は、以下のシリーズの一部です。
引用元:Databricks と Confluent をつなぐ:データ連携の全体像を徹底解説 #Spark - Qiita
事前準備
1. Confluent にて Topic を作成
2. Confluent にて Topic にスキーマを設定
syntax = "proto3";
package ksql;
message users {
int64 registertime = 1;
string userid = 2;
string regionid = 3;
string gender = 4;
}
3. Confluent にて作成したスキーマ(Subject)の Schema ID の値を取得
4. Confluent にて API キーを主塔
5. Confluent にて Schema Registry API キーを取得
6. Databricks にてデータベースのオブジェクトを作成
%sql
CREATE CATALOG IF NOT EXISTS confluent_test;
CREATE SCHEMA IF NOT EXISTS confluent_test.test;
src_table_name = "confluent_test.test.src_table"
checkpoint_volume_name = "confluent_test.test.checkpoint_01"
checkpoint_volume_dir = f"/Volumes/confluent_test/test/checkpoint_01"
spark.sql(f"CREATE TABLE IF NOT EXISTS {src_table_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {checkpoint_volume_name}")
dbutils.fs.rm(checkpoint_volume_dir, True)
7. テーブルにデータを書きこみ
data = [
("User_1", 1507214855421, "Region_1", "OTHER"),
("User_2", 1516353515688, "Region_2", "OTHER"),
("User_3", 1511384261793, "Region_3", "FEMALE")
]
columns = ["userid", "registertime", "regionid", "gender"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").saveAsTable(src_table_name)
spark.table(src_table_name).display()
8. ノートブックを同じディレクトリにuser.proto
ファイルを作成
syntax = "proto3";
package ksql;
message users {
int64 registertime = 1;
string userid = 2;
string regionid = 3;
string gender = 4;
}
9. Desc ファイルであるproto.desc
を作成
%sh sudo apt-get update
%sh sudo apt-get install -y protobuf-compiler
%sh protoc --include_imports \
--descriptor_set_out=user.desc \
user.proto
データを書き込む手順
1. Databricks 上で API キーと Schema Registry API キーを変数にセット
# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921aa.us-east-2.aws.confluent.cloud:9092"
sasl_username = "U4BML4OPRCaaaa"
sasl_password = "zzEjUzyUwYYJneNE240L7jgPVrHAtRBANo6wnnzCgKlGIJ9OxJqaJZWRvaaaaa"
2. Databricks にて Schema ID を変数にセット
schema_id_num = 100003
3. Databricks に Topic と Desc ファイルに関する情報を変数にセット
topic_name = "protobuf_without_sr_01"
desc_file_paht = "./user.desc"
message_name = "ksql.users"
4. Databricks にてWire Header を変数にセット
# 下記ドキュメントを参考に message-indexes を指定
# https://docs.confluent.io/cloud/current/sr/fundamentals/serdes-develop/index.html#wire-format
message_indexes = b"\x00"
# Confluent Wire Header の定義
magic_byte_num = 0
wire_header = bytes([magic_byte_num])
wire_header += schema_id_num.to_bytes(4, byteorder='big', signed=False)
wire_header += message_indexes
print(wire_header)
5. Databricks にて Conflunet へ書き込みを実施
# 書き込み時に利用するチェックポイントのディレクトリを指定
checkpoint_dir = checkpoint_volume_dir + "/" + topic_name
from pyspark.sql.functions import col, struct, lit
from pyspark.sql.protobuf.functions import to_protobuf
src_df = spark.readStream.table(src_table_name)
# key 列を定義
src_df = src_df.withColumn("key", col("userid"))
# magic-byte, schema-id, message-indexes, Avro のデータをもつ value 列を定義
src_df = src_df.withColumn(
"value", struct("registertime", "userid", "regionid", "gender")
)
src_df = src_df.withColumn(
"value",
to_protobuf(
col("value"),
messageName=message_name,
descFilePath=desc_file_paht,
),
)
# value 列に Confluent Protobuf Wire Header を追加
src_df = src_df.withColumn("add_bytes", lit(wire_header))
src_df = src_df.withColumn("value", concat(col("add_bytes"), col("value")))
# key 列と value 列のみを抽出
src_df = src_df.select("key", "value")
(
src_df.writeStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("kafka.security.protocol", "SASL_SSL")
.option(
"kafka.sasl.jaas.config",
f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
)
.option("kafka.sasl.mechanism", "PLAIN")
.option("topic", topic_name)
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.start()
)