0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Confluent で Avro 形式をシリアライズする際のスキーマパターン検証

Last updated at Posted at 2024-12-22

概要

本記事では、Confluent 上で Avro 形式をシリアライズする際に指定できる代表的なスキーマ構造を、実際のコード例とともに検証してみました。この記事の流れは以下のとおりです。

  1. 事前準備
    Confluent への接続情報と Schema Registry への接続情報を設定し、サンプルコードを動かせるようにします。

  2. Avro メッセージ送信用の関数定義
    簡易的な send_avro_message() 関数を用意し、Avro スキーマとレコードを指定するだけで Kafka にメッセージを送れるようにします。

  3. 検証用 Topic の作成
    複数のスキーマパターンを試すために、あらかじめいくつかのトピックを用意しておきます。

  4. 各スキーマパターンの紹介と検証結果
    8 種類のスキーマパターン(単一値、Record、ネスト、Array、Map、Union、Enum、Fixed)について、それぞれコード例と Confluent 上の出力例を示します。

代表的な 8 つの Avro スキーマパターンを例示します。

  1. 単一値パターン
  2. 基本的な Record パターン
  3. ネスト(複数レコード)パターン
  4. Array パターン
  5. Map パターン
  6. Union パターン
  7. Enum パターン
  8. Fixed パターン

本記事は、以下のシリーズの一部です。

image.png

引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita

1. 事前準備

Confluent と Schema Registry への接続情報をセット

まずは、Confluent Kafka(Bootstrap サーバー)と Confluent Schema Registry への接続情報を設定します。

# Kafka の接続情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "B5NJBZ22ZKEJOG22"
sasl_password = "7IMvcMC52lAiGPixGB3AyekX0Tk2nNXqIzufYRoxIwdVw8nJAOFAjngKMz2W4Cg0"
kafka_conf = {
    "bootstrap.servers": bootstrap_servers,
    "sasl.username":     sasl_username,
    "sasl.password":     sasl_password,
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms":   "PLAIN",
}

# Schema Registry の接続情報
sr_url = "https://psrc-lq2dm.us-east-2.aws.confluent.cloud"
sr_api_key = "3CEWV5JNSV6IEKIV"
sr_api_secret = "Eo+9lTX117MrFtS6n09tpdpK7HcXZ6aXoF2d+mNnJYE/4Xtgiyy5ybennM5dCNdP"
sr_conf = {
    "url": sr_url,
    "basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}"
}

2. Avro メッセージを Produce する関数定義

次に、Avro メッセージを Kafka に送信するための関数を作成します。confluent_kafkaconfluent_kafka.schema_registry のライブラリを使い、Schema Registry との連携を行います。

from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

def send_avro_message(
    sr_conf: dict,
    avro_schema_str: str,
    kafka_conf: dict,
    avro_topic_name: str,
    record: dict,
    record_key_value: str = "1",
) -> None:
    """
    AvroメッセージをKafkaに送信する。

    Parameters
    ----------
    sr_conf : dict
        Schema Registryに接続するための設定情報
    avro_schema_str : str
        Avroスキーマ文字列
    kafka_conf : dict
        Kafka Producerに接続するための設定情報
    avro_topic_name : str
        メッセージを送信するKafkaトピック名
    record : dict
        送信したいレコード (Avro スキーマに準拠した辞書)
    record_key_value : str
        Kafka メッセージのキー。必要に応じて変更可
    """
    # SchemaRegistryClient と AvroSerializer を初期化
    schema_registry_client = SchemaRegistryClient(sr_conf)
    avro_serializer = AvroSerializer(
        schema_registry_client,
        avro_schema_str
    )

    # Kafka Producer を初期化
    producer = Producer(kafka_conf)

    # Avro でシリアライズ
    serialized_value = avro_serializer(
        record,
        SerializationContext(avro_topic_name, MessageField.VALUE)
    )

    # Kafka にメッセージを送信
    producer.produce(
        topic=avro_topic_name,
        key=str(record_key_value),
        value=serialized_value
    )

    # 送信をフラッシュ(ブロッキングして完了を待つ)
    producer.flush()
    print("Avroメッセージ書き込み成功")

3. 検証に利用する Topic を作成

検証用に、あらかじめ複数のトピックを作成しておきます。Confluent の AdminClient を利用し、指定のトピックが無ければ作成し、既に存在していればエラーが出る形です。

from confluent_kafka.admin import AdminClient, NewTopic

def create_topics_example(kafka_conf, topics_to_create):
    # AdminClientの設定
    admin_client = AdminClient(kafka_conf)
    
    # トピック作成リクエスト(NewTopic オブジェクト)のリストを作る
    new_topics = []
    for topic_name in topics_to_create:
        # 必要に応じて num_partitions や replication_factor を設定
        new_topic = NewTopic(topic=topic_name)
        new_topics.append(new_topic)
    
    # AdminClient の create_topics を呼び出すと、戻り値は {topic名: Future} の辞書
    futures = admin_client.create_topics(new_topics)
    
    # Future の結果をチェックして、作成成功 / 失敗を判定
    for topic, f in futures.items():
        try:
            f.result()  # 例外が起きなければ成功
            print(f"Topic '{topic}' created successfully.")
        except Exception as e:
            print(f"Failed to create topic '{topic}': {e}")
topics_to_create = [
    "avro_schema_01",
    "avro_schema_02",
    "avro_schema_03",
    "avro_schema_04",
    "avro_schema_05",
    "avro_schema_06",
    "avro_schema_07",
    "avro_schema_08",
]

create_topics_example(
    kafka_conf,
    topics_to_create,
)

4. 代表的なスキーマパターンと動作確認

4.1. 単一値パターン

single_topic_name = "avro_schema_01"
single_schema_str = """
{
    "name": "id",
    "type": "long",
    "doc": "User ID"
}
"""
single_value = 12345

send_avro_message(
    sr_conf,
    single_schema_str,
    kafka_conf,
    single_topic_name,
    single_value
)
  • ポイント: type"long" (や "int", "string" など) を直接書くことで、レコード型ではなくスカラーとしての Avro を定義できます。

onfluent 上では、Schema Registry に単一フィールドとして登録されますが、実際のメッセージの中身は単なる数値です。

image.png

4.2. 基本的な Record パターン

record_topic_name = "avro_schema_02"
record_schema_str = """
{
  "type": "record",
  "name": "SimpleTypes",
  "fields": [
    { "name": "id",  "type": "int" },
    { "name": "offset", "type": "long" },
    { "name": "ratio",  "type": "float" },
    { "name": "score",  "type": "double" },
    { "name": "data",   "type": "bytes" },
    { "name": "title",  "type": "string" },
    { "name": "flag",   "type": "boolean" },
    { "name": "null",   "type": "null" }
  ]
}
"""
record_value = {
    "id": 42,
    "offset": 10000000000,
    "ratio": 0.5,
    "score": 1234.5678,
    "data": b"binary_data",
    "title": "Hello Avro",
    "flag": True,
    "null": None,
}

send_avro_message(
    sr_conf,
    record_schema_str,
    kafka_conf,
    record_topic_name,
    record_value,
)
  • ポイント: レコード型 ("type": "record") は 複数のフィールド を持たせる場合に便利です。
  • フィールド名は重複できない、型定義が厳密など、JSON スキーマとは異なる Avro 特有のルールがあります。
  • bytes は文字列ではなく バイナリデータ として扱われます。

image.png

Confluent 上では下記のようになっておりました。

image.png

4.3. ネスト(複数レコード)パターン

nested_records_topic = "avro_schema_03"
nested_records_schema_str = """
{
  "type": "record",
  "name": "OuterRecord",
  "namespace": "com.example.nested",
  "fields": [
    {
      "name": "inner1",
      "type": {
        "type": "record",
        "name": "InnerRecord1",
        "fields": [
          { "name": "value_a", "type": "string" },
          { "name": "value_b", "type": "int"    }
        ]
      }
    },
    {
      "name": "inner2",
      "type": {
        "type": "record",
        "name": "InnerRecord2",
        "fields": [
          { "name": "count", "type": "long" }
        ]
      }
    }
  ]
}
"""
nested_records_value = {
    "inner1": {
        "value_a": "hello",
        "value_b": 123
    },
    "inner2": {
        "count": 999999
    }
}

send_avro_message(
    sr_conf,
    nested_records_schema_str,
    kafka_conf,
    nested_records_topic,
    nested_records_value,
)
  • ポイント: レコードの中に、さらに別のレコード(InnerRecord1InnerRecord2)をネスト(入れ子)で定義する例です。

Confluent 上では下記のようになっておりました。

image.png

4.4. Array パターン

array_topic_name = "avro_schema_04"
array_schema_str = """
{
  "type": "record",
  "name": "Team",
  "fields": [
    {
      "name": "members",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}
"""
array_value = {
    "members": ["Alice", "Bob", "Charlie"]
}

send_avro_message(
    sr_conf,
    array_schema_str,
    kafka_conf,
    array_topic_name,
    array_value
)
  • ポイント: Avro で配列を扱う場合は "type": "array" を指定し、"items" で格納される要素の型を明記します。

Confluent 上では、配列形式の要素がそのまま確認できます。

image.png


4.5. Map パターン

map_topic_name = "avro_schema_05"
map_schema_str = """
{
  "type": "record",
  "name": "Settings",
  "fields": [
    {
      "name": "config",
      "type": {
        "type": "map",
        "values": "string"
      }
    }
  ]
}
"""
map_value = {
    "config": {
        "timeout": "30",
        "retry_count": "3",
        "enableFeatureX": "true"
    }
}

send_avro_message(
    sr_conf,
    map_schema_str,
    kafka_conf,
    map_topic_name,
    map_value
)
  • ポイント: "type": "map"キー文字列 → 値 のペアを扱うための構造です。

Confluent 上では下記のようになっておりました。

image.png


4.6. Union パターン(複数の型のいずれかを許容する)

union_topic_name = "avro_schema_06"
union_schema_str = """
{
  "type": "record",
  "name": "UserProfile",
  "fields": [
    {
      "name": "nickname",
      "type": ["int", "string"]
    }
  ]
}
"""

# nickname が string のケース
union_value_1 = {
    "nickname": "Tarochan"
}
# nickname が null のケース
union_value_2 = {
    "nickname": 123
}

send_avro_message(
    sr_conf,
    union_schema_str,
    kafka_conf,
    union_topic_name,
    union_value_1,
)
send_avro_message(
    sr_conf,
    union_schema_str,
    kafka_conf,
    union_topic_name,
    union_value_2,
)
  • ポイント: "type": ["int", "string"] のように 複数の型を指定できます。

Confluent 上では下記のようになっておりました。

image.png
image.png


4.7. Enum パターン(指定した文字列セットのみを許容)

enum_topic_name = "avro_schema_07"
enum_schema_str = """
{
  "type": "record",
  "name": "Task",
  "fields": [
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "TaskStatus",
        "symbols": ["TODO", "IN_PROGRESS", "DONE"]
      }
    }
  ]
}
"""
enum_value_1 = {
    "status": "IN_PROGRESS"
}

send_avro_message(
    sr_conf,
    enum_schema_str,
    kafka_conf,
    enum_topic_name,
    enum_value_1
)
  • ポイント: Enum は 許容する文字列のセットを厳密に定義でき、定義外の文字列を送るとエラーになります。

想定外の値を書き込みを実施した際には、下記のエラーが発生しました。

enum_value_2 = {
    "status": "XXXXX"
}
send_avro_message(
    sr_conf,
    enum_schema_str,
    kafka_conf,
    enum_topic_name,
    enum_value_2
)
ValueError: 'XXXXX' is not in list

image.png

Confluent 上では下記のようになっておりました。

image.png

4.8. Fixed パターン(固定長バイナリ)

fixed_topic_name = "avro_schema_08"
fixed_schema_str = """
{
  "type": "record",
  "name": "EncryptedData",
  "fields": [
    {
      "name": "iv",
      "type": {
        "type": "fixed",
        "name": "IV",
        "size": 16
      }
    },
    {
      "name": "encryptedPayload",
      "type": "bytes"
    }
  ]
}
"""

# 16バイトの IV (例: 0x00 を16個繋げたバイナリ)
iv_bytes = b"\x00" * 16
# 任意のバイト列 (ここでは 'Hello, Avro!' の bytes)
payload_bytes = b"Hello, Avro!"

fixed_value = {
    "iv": iv_bytes,
    "encryptedPayload": payload_bytes
}

send_avro_message(
    sr_conf,
    fixed_schema_str,
    kafka_conf,
    fixed_topic_name,
    fixed_value
)
  • ポイント: "type": "fixed"固定長のバイナリを扱えます。

Confluent 上では下記のようになっておりました。

image.png

まとめ

Avro はスキーマに沿って厳密に型を定義できるため、破壊的変更が難しい反面、スキーマ変更の影響範囲を小さくできるという大きな利点があります。Confluent Schema Registry と組み合わせることで、以下のようなメリットが得られます。

  • スキーマをバージョン管理できる(後方互換性や前方互換性などをチェック可能)。
  • クライアント(Producer / Consumer)側でスキーマを自動的に取得し、マッピングしやすくなる。
  • JSON よりもデータ量(シリアライズ後のサイズ)がコンパクトになりやすい。
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?