概要
本記事では、Confluent 上で JSON 形式をシリアライズする際に指定できる代表的なスキーマ構造を、実際のコード例とともに検証してみました。この記事の流れは以下のとおりです。
-
事前準備
Confluent への接続情報と Schema Registry への接続情報を設定し、サンプルコードを動かせるようにします。 -
JSON メッセージ送信用の関数定義
簡易的なsend_json_message()
関数を用意し、JSON Schema とレコードを指定するだけで Kafka にメッセージを送れるようにします。 -
検証用 Topic の作成
複数のスキーマパターンを試すために、あらかじめいくつかのトピックを用意しておきます。 -
各スキーマパターンの紹介と検証結果
8 種類のスキーマパターン(単一値、Object、ネスト、Array、Map、oneOf、Enum、フォーマット指定)について、それぞれコード例と Confluent 上の出力例を示します。
JSON Schema は、JSON データ構造を定義するためのスキーマ言語です。Confluent Schema Registry では Avro だけでなく JSON Schema も格納&バージョン管理が可能なので、多言語クライアントから利用する場合にも便利です。
代表的な 8 つの JSON スキーマパターンを例示します。
- 単一値パターン
- 基本的な Object パターン
- ネスト(入れ子オブジェクト)パターン
- Array パターン
- Map(任意キーを持つオブジェクト)パターン
- oneOf パターン
- Enum パターン
- フォーマット指定パターン
本記事は、以下のシリーズの一部です。
引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita
1. 事前準備
Confluent と Schema Registry への接続情報をセット
まずは、Confluent Kafka(Bootstrap サーバー)と Confluent Schema Registry への接続情報を設定します。
以下の例では「sasl_username」「sasl_password」「sr_api_key」「sr_api_secret」などの値はサンプルですので、実際の環境に合わせて置き換えてください。
# Kafka の接続情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "B5NJBZ22ZKEJOG22"
sasl_password = "7IMvcMC52lAiGPixGB3AyekX0Tk2nNXqIzufYRoxIwdVw8nJAOFAjngKMz2W4Cg0"
kafka_conf = {
"bootstrap.servers": bootstrap_servers,
"sasl.username": sasl_username,
"sasl.password": sasl_password,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
}
# Schema Registry の接続情報
sr_url = "https://psrc-lq2dm.us-east-2.aws.confluent.cloud"
sr_api_key = "3CEWV5JNSV6IEKIV"
sr_api_secret = "Eo+9lTX117MrFtS6n09tpdpK7HcXZ6aXoF2d+mNnJYE/4Xtgiyy5ybennM5dCNdP"
sr_conf = {
"url": sr_url,
"basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}"
}
2. JSON メッセージを Produce する関数定義
次に、JSON メッセージを Kafka に送信するための関数を作成します。confluent_kafka
や confluent_kafka.schema_registry
のライブラリを使い、Schema Registry との連携を行います。
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer
def send_json_message(
sr_conf: dict,
json_schema_str: str,
kafka_conf: dict,
json_topic_name: str,
record: dict,
record_key_value: str = "1",
) -> None:
"""
JSONメッセージをKafkaに送信する。
Parameters
----------
sr_conf : dict
Schema Registryに接続するための設定情報
json_schema_str : str
JSON Schema文字列
kafka_conf : dict
Kafka Producerに接続するための設定情報
json_topic_name : str
メッセージを送信するKafkaトピック名
record : dict
送信したいレコード (JSON Schemaに準拠した辞書)
record_key_value : str
Kafka メッセージのキー。必要に応じて変更可
"""
# SchemaRegistryClient と JSONSerializer を初期化
schema_registry_client = SchemaRegistryClient(sr_conf)
# JSON Schema の検証用関数 (record をバリデーションする際に利用される)
def dict_to_json(obj, ctx):
return obj
json_serializer = JSONSerializer(
json_schema_str,
schema_registry_client,
to_dict=dict_to_json
)
# Kafka Producer を初期化
producer = Producer(kafka_conf)
# JSON でシリアライズ
serialized_value = json_serializer(
record,
SerializationContext(json_topic_name, MessageField.VALUE)
)
# Kafka にメッセージを送信
producer.produce(
topic=json_topic_name,
key=str(record_key_value),
value=serialized_value
)
# 送信をフラッシュ(ブロッキングして完了を待つ)
producer.flush()
print("JSONメッセージ書き込み成功")
3. 検証に利用する Topic を作成
検証用に、あらかじめ複数のトピックを作成しておきます。Confluent の AdminClient を利用し、指定のトピックが無ければ作成し、既に存在していればエラーが出る形です。
from confluent_kafka.admin import AdminClient, NewTopic
def create_topics_example(kafka_conf, topics_to_create):
# AdminClientの設定
admin_client = AdminClient(kafka_conf)
# トピック作成リクエスト(NewTopic オブジェクト)のリストを作る
new_topics = []
for topic_name in topics_to_create:
# 必要に応じて num_partitions や replication_factor を設定
new_topic = NewTopic(topic=topic_name)
new_topics.append(new_topic)
# AdminClient の create_topics を呼び出すと、戻り値は {topic名: Future} の辞書
futures = admin_client.create_topics(new_topics)
# Future の結果をチェックして、作成成功 / 失敗を判定
for topic, f in futures.items():
try:
f.result() # 例外が起きなければ成功
print(f"Topic '{topic}' created successfully.")
except Exception as e:
print(f"Failed to create topic '{topic}': {e}")
topics_to_create = [
"json_schema_01",
"json_schema_02",
"json_schema_03",
"json_schema_04",
"json_schema_05",
"json_schema_06",
"json_schema_07",
"json_schema_08",
]
create_topics_example(
kafka_conf,
topics_to_create,
)
4. 代表的なスキーマパターンと動作確認
以下では、Confluent Schema Registry に登録する JSON Schema を定義してメッセージを送信します。送信後、Confluent Cloud / Schema Registry の画面や Consumer 側でのデシリアライズ状況を確認してみてください。
4.1. 単一値パターン
JSON Schema では、スキーマの
type
に"string"
,"number"
,"integer"
,"boolean"
など単一のスカラータイプを指定できます。
single_topic_name = "json_schema_01"
single_schema_str = """
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "number",
"description": "A single numeric value"
}
"""
single_value = 12345
send_json_message(
sr_conf,
single_schema_str,
kafka_conf,
single_topic_name,
single_value
)
-
ポイント: JSON ではスキーマ定義に
"type": "number"
のみを書いておけば、単なる数値として取り扱えます。
Confluent Schema Registry 上では、スカラースキーマとして管理されます。
4.2. 基本的な Object パターン
object_topic_name = "json_schema_02"
object_schema_str = """
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"offset": {
"type": "number"
},
"ratio": {
"type": "number"
},
"score": {
"type": "number"
},
"data": {
"type": "string"
},
"title": {
"type": "string"
},
"flag": {
"type": "boolean"
},
"null": {
"type": "null"
}
},
"required": ["id"]
}
"""
object_value = {
"id": 42,
"offset": 10000000000,
"ratio": 0.5,
"score": 1234.5678,
"data": "binary_data_in_string",
"title": "Hello JSON",
"flag": True,
"null": None
}
send_json_message(
sr_conf,
object_schema_str,
kafka_conf,
object_topic_name,
object_value,
)
-
ポイント:
-
"type": "object"
とし、"properties"
でフィールド名ごとの型を定義します。 - JSON Schema では
"bytes"
型のような概念はなく、バイナリは 文字列として表現されるのが一般的です。 -
"required"
プロパティを使って、必須項目を指定することが可能です。
-
id 列がないレコードを書き込みを実施したところ、下記のエラーとなりました。
object_value_2 = {
"offset": 10000000000,
"ratio": 0.5,
"score": 1234.5678,
"data": "binary_data_in_string",
"title": "Hello JSON",
"flag": True,
"null": None
}
send_json_message(
sr_conf,
object_schema_str,
kafka_conf,
object_topic_name,
object_value_2,
)
ValidationError: 'id' is a required property
Failed validating 'required' in schema:
{'$schema': 'http://json-schema.org/draft-07/schema#',
'type': 'object',
'properties': {'id': {'type': 'integer'},
'offset': {'type': 'number'},
'ratio': {'type': 'number'},
'score': {'type': 'number'},
'data': {'type': 'string'},
'title': {'type': 'string'},
'flag': {'type': 'boolean'},
'null': {'type': 'null'}},
'required': ['id']}
On instance:
{'offset': 10000000000,
'ratio': 0.5,
'score': 1234.5678,
'data': 'binary_data_in_string',
'title': 'Hello JSON',
'flag': True,
'null': None}
SerializationError: 'id' is a required property
Confluent 上では下記のようになっておりました。
4.3. ネスト(入れ子オブジェクト)パターン
nested_object_topic = "json_schema_03"
nested_object_schema_str = """
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"inner1": {
"type": "object",
"properties": {
"value_a": {
"type": "string"
},
"value_b": {
"type": "integer"
}
},
"required": ["value_a", "value_b"]
},
"inner2": {
"type": "object",
"properties": {
"count": {
"type": "integer"
}
},
"required": ["count"]
}
},
"required": ["inner1", "inner2"]
}
"""
nested_object_value = {
"inner1": {
"value_a": "hello",
"value_b": 123
},
"inner2": {
"count": 999999
}
}
send_json_message(
sr_conf,
nested_object_schema_str,
kafka_conf,
nested_object_topic,
nested_object_value,
)
- ポイント: オブジェクトの中に、さらに別のオブジェクトをネスト(入れ子)で定義します。
- JSON Schema の
"properties"
の中で、オブジェクトを階層的に記述できます。
Confluent 上では下記のようになっておりました。
4.4. Array パターン
array_topic_name = "json_schema_04"
array_schema_str = """
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"members": {
"type": "array",
"items": {
"type": "string"
}
}
},
"required": ["members"]
}
"""
array_value = {
"members": ["Alice", "Bob", "Charlie"]
}
send_json_message(
sr_conf,
array_schema_str,
kafka_conf,
array_topic_name,
array_value
)
-
ポイント: JSON で配列を扱う場合は
"type": "array"
を指定し、"items"
で格納される要素の型を明記します。
Confluent Schema Registry 上では、配列として登録・バージョン管理されます。
4.5. Map(任意キーを持つオブジェクト)パターン
Avro の "type": "map"
は「キーが文字列で値が特定型」という構造を表しますが、JSON Schema では「任意のプロパティ名」として表現します。
具体的には "patternProperties"
や "additionalProperties"
を使った定義が近いパターンです。
map_topic_name = "json_schema_05"
map_schema_str = """
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"config": {
"type": "object",
"patternProperties": {
"^[a-zA-Z0-9_]+$": {
"type": "string"
}
},
"additionalProperties": false
}
},
"required": ["config"]
}
"""
map_value = {
"config": {
"timeout": "30",
"retry_count": "3",
"enableFeatureX": "true"
}
}
send_json_message(
sr_conf,
map_schema_str,
kafka_conf,
map_topic_name,
map_value
)
-
ポイント:
-
"patternProperties"
で「キーの正規表現」と「値の型」を指定しています。ここでは「英数字とアンダースコアのみ」を許容例としました。 -
"additionalProperties": false
とすることで、定義外のキーを禁止できます(必要に応じてtrue
や"type": "string"
を設定)。
-
Confluent 上では下記のようになっておりました。
4.6. oneOf パターン(複数の型のいずれかを許容する)
Avro の Union 相当の構造は、JSON Schema では "oneOf"
や "anyOf"
を使うことで実現します。
oneof_topic_name = "json_schema_06"
oneof_schema_str = """
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"nickname": {
"oneOf": [
{ "type": "integer" },
{ "type": "string" }
]
}
},
"required": ["nickname"]
}
"""
# nickname が string のケース
oneof_value_1 = {
"nickname": "Tarochan"
}
# nickname が integer のケース
oneof_value_2 = {
"nickname": 123
}
send_json_message(
sr_conf,
oneof_schema_str,
kafka_conf,
oneof_topic_name,
oneof_value_1,
)
send_json_message(
sr_conf,
oneof_schema_str,
kafka_conf,
oneof_topic_name,
oneof_value_2,
)
-
ポイント: Avro の
"type": ["int", "string"]
と同様に、JSON Schema でも複数の型を許容できます。 -
"oneOf"
以外にも"anyOf"
や"allOf"
を利用した柔軟なスキーマ設計が可能です。
Confluent 上では下記のようになっておりました。
4.7. Enum パターン(指定した文字列セットのみを許容)
JSON Schema でも {"enum": ["TODO", "IN_PROGRESS", "DONE"]}
のように、特定の値のみを許容できます。
enum_topic_name = "json_schema_07"
enum_schema_str = """
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["TODO", "IN_PROGRESS", "DONE"]
}
},
"required": ["status"]
}
"""
enum_value_1 = {
"status": "IN_PROGRESS"
}
send_json_message(
sr_conf,
enum_schema_str,
kafka_conf,
enum_topic_name,
enum_value_1
)
-
ポイント: Avro の Enum 同様、JSON Schema でも
"enum"
で明示的に許可する値を限定できます。
定義外の値を送ろうとすると、Schema Registry での検証に失敗します。
# 想定外の値の場合 (エラーになる例)
# ValueError が発生することを確認
enum_value_2 = {
"status": "XXXXX"
}
send_json_message(
sr_conf,
enum_schema_str,
kafka_conf,
enum_topic_name,
enum_value_2
)
SerializationError: 'XXXXX' is not one of ['TODO', 'IN_PROGRESS', 'DONE']
Confluent 上では下記のようになっておりました。
4.8. フォーマット指定パターン
Avro の "type": "fixed"
に相当する厳密なバイナリ長の概念は JSON Schema にはありませんが、たとえば "format"
キーワードやカスタムパターンで表現できます。
"format"
には "date-time"
, "email"
, "hostname"
, "ipv4"
, "ipv6"
, "uri"
などが用意されています。
format_topic_name = "json_schema_08"
format_schema_str = """
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"createdAt": {
"type": "string",
"format": "date-time"
},
"message": {
"type": "string",
"maxLength": 16
}
},
"required": ["createdAt"]
}
"""
format_value = {
"createdAt": "2023-01-01T12:34:56Z",
"message": "Hello JSON!"
}
send_json_message(
sr_conf,
format_schema_str,
kafka_conf,
format_topic_name,
format_value
)
-
ポイント:
-
"format": "date-time"
のように、日付時刻文字列を期待している旨を明示的に表せます。 -
"maxLength": 16
とすることで、message
フィールドの長さを制限するなどの簡易的なバリデーションも可能です。
-
format_value_2 = {
"createdAt": "9999-99-99T99:99:99Z",
"message": "Hello JSON!XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
}
send_json_message(
sr_conf,
format_schema_str,
kafka_conf,
format_topic_name,
format_value_2
)
message 列の値を 16 文字以上で書き込みを実施したところ、下記のエラーが発生しました。
ValidationError: 'Hello JSON!XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX' is too long
Failed validating 'maxLength' in schema['properties']['message']:
{'type': 'string', 'maxLength': 16}
On instance['message']:
'Hello JSON!XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
During handling of the above exception, another exception occurred:
SerializationError: 'Hello JSON!XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX' is too long
Confluent 上では下記のようになっておりました。
まとめ
JSON はもともと可変・動的な構造を表現しやすいフォーマットですが、Schema Registry と組み合わせることで 厳密な型・構造を定義しつつも柔軟さを維持できるようになります。Avro と同様に、以下のようなメリットがあります。
- スキーマをバージョン管理できる(後方互換性や前方互換性の検証がしやすい)。
- クライアント(Producer / Consumer)側で スキーマを自動的に取得し、マッピングを行える。
- JSON 形式であれば 任意の言語 / ライブラリでも取り回しがしやすい。
一方で、Avro ほどのバイナリ圧縮効果は期待できないものの、すでに JSON ベースのシステムを運用している場合は移行コストが低いという利点があります。いずれにせよ、Kafka + Confluent Schema Registry の組み合わせでは Avro / JSON / Protobuf など多彩な形式が選択できるので、チームや運用方針に合ったスキーマを選択してみてください。