0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks から Confluent に Avro 形式の Topic データを書き込む際に手動でスキーマを指定して実施する方法

Last updated at Posted at 2024-12-26

概要

Databricks から Confluent に Avro 形式の Topic データを書き込む際に手動でスキーマを指定して実施する方法を紹介します。

本記事は、以下のシリーズの一部です。

image.png

引用元:Databricks と Confluent をつなぐ:データ連携の全体像を徹底解説 #Spark - Qiita

事前準備

1. Confluent にて Topic を作成

image.png

2. Confluent にて Topic にスキーマを設定

{
  "connect.name": "ksql.users",
  "fields": [
    {
      "name": "registertime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "regionid",
      "type": "string"
    },
    {
      "name": "gender",
      "type": "string"
    }
  ],
  "name": "users",
  "namespace": "ksql",
  "type": "record"
}

image.png

3. Confluent にて作成したスキーマ(Subject)の Schema ID の値を取得

image.png

4. Confluent にて API キーを主塔

image.png

5. Confluent にて Schema Registry API キーを取得

image.png

6. Databricks にてデータベースのオブジェクトを作成

%sql
CREATE CATALOG IF NOT EXISTS confluent_test;
CREATE SCHEMA IF NOT EXISTS confluent_test.test;
src_table_name = "confluent_test.test.src_table"
checkpoint_volume_name = "confluent_test.test.checkpoint_01"
checkpoint_volume_dir = f"/Volumes/confluent_test/test/checkpoint_01"
spark.sql(f"CREATE TABLE IF NOT EXISTS {src_table_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {checkpoint_volume_name}")
dbutils.fs.rm(checkpoint_volume_dir, True)

image.png

7. テーブルにデータを書きこみ

data = [
    ("User_1", 1507214855421, "Region_1", "OTHER"),
    ("User_2", 1516353515688, "Region_2", "OTHER"),
    ("User_3", 1511384261793, "Region_3", "FEMALE")
]

columns = ["userid", "registertime", "regionid", "gender"]

df = spark.createDataFrame(data, columns)

df.write.format("delta").mode("overwrite").saveAsTable(src_table_name)

spark.table(src_table_name).display()

image.png

データを書き込む手順

1. Databricks 上で API キーと Schema Registry API キーを変数にセット

# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921aa.us-east-2.aws.confluent.cloud:9092"
sasl_username = "U4BML4OPRCaaaa"
sasl_password = "zzEjUzyUwYYJneNE240L7jgPVrHAtRBANo6wnnzCgKlGIJ9OxJqaJZWRvaaaaa"

image.png

2. Databricks にて Schema ID を変数にセット

schema_id_num = 100001

image.png

3. Databricks に Topic とスキーマを変数にセット

topic_name = "avro_without_sr_01"
schema = """
{
  "connect.name": "ksql.users",
  "fields": [
    {
      "name": "registertime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "regionid",
      "type": "string"
    },
    {
      "name": "gender",
      "type": "string"
    }
  ],
  "name": "users",
  "namespace": "ksql",
  "type": "record"
}
"""

image.png

4. Databricks にてWire Header を変数にセット

# Confluent Wire Header の定義
magic_byte_num = 0
wire_header = bytes([magic_byte_num])
wire_header += schema_id_num.to_bytes(4, byteorder='big', signed=False)
print(wire_header)

image.png

5. Databricks にて Conflunet へ書き込みを実施

# 書き込み時に利用するチェックポイントのディレクトリを指定
checkpoint_dir = checkpoint_volume_dir + "/" + topic_name

from pyspark.sql.functions import col, struct, lit ,concat
from pyspark.sql.avro.functions import to_avro

src_df = spark.readStream.table(src_table_name)

# key 列を定義
src_df = src_df.withColumn("key", col("userid"))

# magic-byte, schema-id, message-indexes, Avro のデータをもつ value 列を定義
src_df = src_df.withColumn("value", struct("registertime", "userid", "regionid", "gender"))
src_df = src_df.withColumn("value", to_avro(col("value"), schema))

# value 列に Confluent Protobuf Wire Header を追加
src_df = src_df.withColumn("add_bytes", lit(wire_header))
src_df = src_df.withColumn("value", concat(col("add_bytes"), col("value")))

# key 列と value 列のみを抽出
src_df = src_df.select("key", "value")

(
    src_df.writeStream.format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    )
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("topic", topic_name)
    .option("checkpointLocation", checkpoint_dir)
    .trigger(availableNow=True)
    .start()
)

image.png

6. Confluent 上でデータが書き込まれたことを確認

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?