概要
本記事では、Confluent 上で Avro 形式をシリアライズする際に指定できる代表的なスキーマ構造を、実際のコード例とともに検証してみました。この記事の流れは以下のとおりです。
-
事前準備
Confluent への接続情報と Schema Registry への接続情報を設定し、サンプルコードを動かせるようにします。 -
Avro メッセージ送信用の関数定義
簡易的なsend_avro_message()
関数を用意し、Avro スキーマとレコードを指定するだけで Kafka にメッセージを送れるようにします。 -
検証用 Topic の作成
複数のスキーマパターンを試すために、あらかじめいくつかのトピックを用意しておきます。 -
各スキーマパターンの紹介と検証結果
8 種類のスキーマパターン(単一値、Record、ネスト、Array、Map、Union、Enum、Fixed)について、それぞれコード例と Confluent 上の出力例を示します。
代表的な 8 つの Avro スキーマパターンを例示します。
- 単一値パターン
- 基本的な Record パターン
- ネスト(複数レコード)パターン
- Array パターン
- Map パターン
- Union パターン
- Enum パターン
- Fixed パターン
本記事は、以下のシリーズの一部です。
引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita
1. 事前準備
Confluent と Schema Registry への接続情報をセット
まずは、Confluent Kafka(Bootstrap サーバー)と Confluent Schema Registry への接続情報を設定します。
# Kafka の接続情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "B5NJBZ22ZKEJOG22"
sasl_password = "7IMvcMC52lAiGPixGB3AyekX0Tk2nNXqIzufYRoxIwdVw8nJAOFAjngKMz2W4Cg0"
kafka_conf = {
"bootstrap.servers": bootstrap_servers,
"sasl.username": sasl_username,
"sasl.password": sasl_password,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
}
# Schema Registry の接続情報
sr_url = "https://psrc-lq2dm.us-east-2.aws.confluent.cloud"
sr_api_key = "3CEWV5JNSV6IEKIV"
sr_api_secret = "Eo+9lTX117MrFtS6n09tpdpK7HcXZ6aXoF2d+mNnJYE/4Xtgiyy5ybennM5dCNdP"
sr_conf = {
"url": sr_url,
"basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}"
}
2. Avro メッセージを Produce する関数定義
次に、Avro メッセージを Kafka に送信するための関数を作成します。confluent_kafka
や confluent_kafka.schema_registry
のライブラリを使い、Schema Registry との連携を行います。
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
def send_avro_message(
sr_conf: dict,
avro_schema_str: str,
kafka_conf: dict,
avro_topic_name: str,
record: dict,
record_key_value: str = "1",
) -> None:
"""
AvroメッセージをKafkaに送信する。
Parameters
----------
sr_conf : dict
Schema Registryに接続するための設定情報
avro_schema_str : str
Avroスキーマ文字列
kafka_conf : dict
Kafka Producerに接続するための設定情報
avro_topic_name : str
メッセージを送信するKafkaトピック名
record : dict
送信したいレコード (Avro スキーマに準拠した辞書)
record_key_value : str
Kafka メッセージのキー。必要に応じて変更可
"""
# SchemaRegistryClient と AvroSerializer を初期化
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_serializer = AvroSerializer(
schema_registry_client,
avro_schema_str
)
# Kafka Producer を初期化
producer = Producer(kafka_conf)
# Avro でシリアライズ
serialized_value = avro_serializer(
record,
SerializationContext(avro_topic_name, MessageField.VALUE)
)
# Kafka にメッセージを送信
producer.produce(
topic=avro_topic_name,
key=str(record_key_value),
value=serialized_value
)
# 送信をフラッシュ(ブロッキングして完了を待つ)
producer.flush()
print("Avroメッセージ書き込み成功")
3. 検証に利用する Topic を作成
検証用に、あらかじめ複数のトピックを作成しておきます。Confluent の AdminClient を利用し、指定のトピックが無ければ作成し、既に存在していればエラーが出る形です。
from confluent_kafka.admin import AdminClient, NewTopic
def create_topics_example(kafka_conf, topics_to_create):
# AdminClientの設定
admin_client = AdminClient(kafka_conf)
# トピック作成リクエスト(NewTopic オブジェクト)のリストを作る
new_topics = []
for topic_name in topics_to_create:
# 必要に応じて num_partitions や replication_factor を設定
new_topic = NewTopic(topic=topic_name)
new_topics.append(new_topic)
# AdminClient の create_topics を呼び出すと、戻り値は {topic名: Future} の辞書
futures = admin_client.create_topics(new_topics)
# Future の結果をチェックして、作成成功 / 失敗を判定
for topic, f in futures.items():
try:
f.result() # 例外が起きなければ成功
print(f"Topic '{topic}' created successfully.")
except Exception as e:
print(f"Failed to create topic '{topic}': {e}")
topics_to_create = [
"avro_schema_01",
"avro_schema_02",
"avro_schema_03",
"avro_schema_04",
"avro_schema_05",
"avro_schema_06",
"avro_schema_07",
"avro_schema_08",
]
create_topics_example(
kafka_conf,
topics_to_create,
)
4. 代表的なスキーマパターンと動作確認
4.1. 単一値パターン
single_topic_name = "avro_schema_01"
single_schema_str = """
{
"name": "id",
"type": "long",
"doc": "User ID"
}
"""
single_value = 12345
send_avro_message(
sr_conf,
single_schema_str,
kafka_conf,
single_topic_name,
single_value
)
-
ポイント:
type
に"long"
(や"int"
,"string"
など) を直接書くことで、レコード型ではなくスカラーとしての Avro を定義できます。
onfluent 上では、Schema Registry に単一フィールドとして登録されますが、実際のメッセージの中身は単なる数値です。
4.2. 基本的な Record パターン
record_topic_name = "avro_schema_02"
record_schema_str = """
{
"type": "record",
"name": "SimpleTypes",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "offset", "type": "long" },
{ "name": "ratio", "type": "float" },
{ "name": "score", "type": "double" },
{ "name": "data", "type": "bytes" },
{ "name": "title", "type": "string" },
{ "name": "flag", "type": "boolean" },
{ "name": "null", "type": "null" }
]
}
"""
record_value = {
"id": 42,
"offset": 10000000000,
"ratio": 0.5,
"score": 1234.5678,
"data": b"binary_data",
"title": "Hello Avro",
"flag": True,
"null": None,
}
send_avro_message(
sr_conf,
record_schema_str,
kafka_conf,
record_topic_name,
record_value,
)
-
ポイント: レコード型 (
"type": "record"
) は 複数のフィールド を持たせる場合に便利です。 - フィールド名は重複できない、型定義が厳密など、JSON スキーマとは異なる Avro 特有のルールがあります。
-
bytes
は文字列ではなく バイナリデータ として扱われます。
Confluent 上では下記のようになっておりました。
4.3. ネスト(複数レコード)パターン
nested_records_topic = "avro_schema_03"
nested_records_schema_str = """
{
"type": "record",
"name": "OuterRecord",
"namespace": "com.example.nested",
"fields": [
{
"name": "inner1",
"type": {
"type": "record",
"name": "InnerRecord1",
"fields": [
{ "name": "value_a", "type": "string" },
{ "name": "value_b", "type": "int" }
]
}
},
{
"name": "inner2",
"type": {
"type": "record",
"name": "InnerRecord2",
"fields": [
{ "name": "count", "type": "long" }
]
}
}
]
}
"""
nested_records_value = {
"inner1": {
"value_a": "hello",
"value_b": 123
},
"inner2": {
"count": 999999
}
}
send_avro_message(
sr_conf,
nested_records_schema_str,
kafka_conf,
nested_records_topic,
nested_records_value,
)
-
ポイント: レコードの中に、さらに別のレコード(
InnerRecord1
やInnerRecord2
)をネスト(入れ子)で定義する例です。
Confluent 上では下記のようになっておりました。
4.4. Array パターン
array_topic_name = "avro_schema_04"
array_schema_str = """
{
"type": "record",
"name": "Team",
"fields": [
{
"name": "members",
"type": {
"type": "array",
"items": "string"
}
}
]
}
"""
array_value = {
"members": ["Alice", "Bob", "Charlie"]
}
send_avro_message(
sr_conf,
array_schema_str,
kafka_conf,
array_topic_name,
array_value
)
-
ポイント: Avro で配列を扱う場合は
"type": "array"
を指定し、"items"
で格納される要素の型を明記します。
Confluent 上では、配列形式の要素がそのまま確認できます。
4.5. Map パターン
map_topic_name = "avro_schema_05"
map_schema_str = """
{
"type": "record",
"name": "Settings",
"fields": [
{
"name": "config",
"type": {
"type": "map",
"values": "string"
}
}
]
}
"""
map_value = {
"config": {
"timeout": "30",
"retry_count": "3",
"enableFeatureX": "true"
}
}
send_avro_message(
sr_conf,
map_schema_str,
kafka_conf,
map_topic_name,
map_value
)
-
ポイント:
"type": "map"
は キー文字列 → 値 のペアを扱うための構造です。
Confluent 上では下記のようになっておりました。
4.6. Union パターン(複数の型のいずれかを許容する)
union_topic_name = "avro_schema_06"
union_schema_str = """
{
"type": "record",
"name": "UserProfile",
"fields": [
{
"name": "nickname",
"type": ["int", "string"]
}
]
}
"""
# nickname が string のケース
union_value_1 = {
"nickname": "Tarochan"
}
# nickname が null のケース
union_value_2 = {
"nickname": 123
}
send_avro_message(
sr_conf,
union_schema_str,
kafka_conf,
union_topic_name,
union_value_1,
)
send_avro_message(
sr_conf,
union_schema_str,
kafka_conf,
union_topic_name,
union_value_2,
)
-
ポイント:
"type": ["int", "string"]
のように 複数の型を指定できます。
Confluent 上では下記のようになっておりました。
4.7. Enum パターン(指定した文字列セットのみを許容)
enum_topic_name = "avro_schema_07"
enum_schema_str = """
{
"type": "record",
"name": "Task",
"fields": [
{
"name": "status",
"type": {
"type": "enum",
"name": "TaskStatus",
"symbols": ["TODO", "IN_PROGRESS", "DONE"]
}
}
]
}
"""
enum_value_1 = {
"status": "IN_PROGRESS"
}
send_avro_message(
sr_conf,
enum_schema_str,
kafka_conf,
enum_topic_name,
enum_value_1
)
- ポイント: Enum は 許容する文字列のセットを厳密に定義でき、定義外の文字列を送るとエラーになります。
想定外の値を書き込みを実施した際には、下記のエラーが発生しました。
enum_value_2 = {
"status": "XXXXX"
}
send_avro_message(
sr_conf,
enum_schema_str,
kafka_conf,
enum_topic_name,
enum_value_2
)
ValueError: 'XXXXX' is not in list
Confluent 上では下記のようになっておりました。
4.8. Fixed パターン(固定長バイナリ)
fixed_topic_name = "avro_schema_08"
fixed_schema_str = """
{
"type": "record",
"name": "EncryptedData",
"fields": [
{
"name": "iv",
"type": {
"type": "fixed",
"name": "IV",
"size": 16
}
},
{
"name": "encryptedPayload",
"type": "bytes"
}
]
}
"""
# 16バイトの IV (例: 0x00 を16個繋げたバイナリ)
iv_bytes = b"\x00" * 16
# 任意のバイト列 (ここでは 'Hello, Avro!' の bytes)
payload_bytes = b"Hello, Avro!"
fixed_value = {
"iv": iv_bytes,
"encryptedPayload": payload_bytes
}
send_avro_message(
sr_conf,
fixed_schema_str,
kafka_conf,
fixed_topic_name,
fixed_value
)
-
ポイント:
"type": "fixed"
で 固定長のバイナリを扱えます。
Confluent 上では下記のようになっておりました。
まとめ
Avro はスキーマに沿って厳密に型を定義できるため、破壊的変更が難しい反面、スキーマ変更の影響範囲を小さくできるという大きな利点があります。Confluent Schema Registry と組み合わせることで、以下のようなメリットが得られます。
- スキーマをバージョン管理できる(後方互換性や前方互換性などをチェック可能)。
- クライアント(Producer / Consumer)側でスキーマを自動的に取得し、マッピングしやすくなる。
- JSON よりもデータ量(シリアライズ後のサイズ)がコンパクトになりやすい。