概要
Python の confluent-kafka
ライブラリを用いて、Schema Registry におけるスキーマ互換性 (Compatibility) を BACKWARD、FORWARD、および NONE の3つについて実際にコードを実行しながら学習できるコンテンツ例です。ここでは Avro スキーマを使用し、スキーマを段階的に変更しながら、どのような変更が許容され、どのような変更が拒否されるかを理解します。
Schema Registry の操作方法については下記の記事で整理しています。
引用元:Python × Confluent Schema Registry:Confluent Schema Registry 入門 #Kafka - Qiita
本記事は、以下のシリーズの一部です。
引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita
互換性 (Compatibility) について
背景と重要性
KafkaやSchema Registryを用いて、ストリーム処理基盤を長期運用する場合、スキーマは時と共に進化します。新たな機能要件やデータ拡張、古いフィールドの削除など、スキーマ変更は避けられません。しかし、既存のコンシューマは過去のスキーマを前提に動作しています。ここで問題となるのが、「新しいスキーマで書き込まれたデータを、古いスキーマしか知らないコンシューマは正常に読み込めるのか?」というデータの互換性です。
互換性設定を正しく行うことで、スキーマ進化に伴う障害(メッセージ読み込みエラー、データ破損、機能停止)を最小限に抑え、システムを円滑に運用できます。
互換性モード(Compatibility Modes) の種類
Schema Registryでは、スキーマを登録する際に「互換性モード」を設定できます。互換性モードは「新旧スキーマ間でどのような互換性を保持すべきか」を定義するルールセットです。互換性モードには以下が存在します。文章だけを読んでもイメージがつかないため、できることとできないことを実際に実行して理解することが有効です。
なぜ互換性が必要なのか?
-
段階的ロールアウト:
一度に全てのコンシューマとプロデューサを更新するのは難しい。互換性を保証すれば、ロールアウトを段階的に行い、ダウンタイムやサービス中断を減らせます。 -
長期的なデータ保存・再利用:
過去に生産されたデータを後で分析したい場合、古いスキーマを理解できる環境で動かすには互換性が必須です。
例えば、過去のメッセージをリプレイする際、最新のアプリケーションでも正しく処理できるように設計するには、ForwardまたはFullな互換性が役立ちます。
互換性維持のためのベストプラクティス
ChatGPT o1 Pro によると下記のベストプラクティスがあるようです。
-
フィールド追加時はdefault値を設定する:
後方互換性を確保するため、追加したフィールドには必ずdefault
値を付けることで、古いコンシューマが新フィールドを知らなくても問題なく処理可能にします。 -
フィールド削除は慎重に:
後方互換モードで必須フィールドを削除すると、古いコンシューマは期待するフィールドが無くエラーとなります。フィールドを削除する前にオプショナル化や段階的な移行を考慮しましょう。 -
型変更は避ける:
互換性が要求される環境で型を変更すると、旧コンシューマが新データを正しくデコードできず問題を起こします。新フィールドを追加することで間接的な型変更を行う戦略を採用します。 -
プロデューサ・コンシューマの段階的アップデート:
互換性を活かし、まずプロデューサを新スキーマで生産できるように変更し、古いコンシューマでも問題なくデータが読める状態にしてからコンシューマを更新する、または逆方向に行う、といった段階的手順が可能になります。
事前準備
Confluet Cloud で検証用の Environment とクラスターを作成
検証用 Environment を作成します。
クラスターを作成します。
Schema Registry の認証情報を取得します。
Google Colab 上でノートブック作成とライブラリのインストール
!pip install confluent-kafka -q
Schema Registry の認証情報をセット
# Schema Registry 接続情報を設定
sr_url = "<schema.registry.url>"
sr_api_key = "<SR_API_KEY>"
sr_api_secret = "<SR_API_SECRET>"
sr_conf = {
'url': sr_url,
'basic.auth.user.info': f"{sr_api_key}:{sr_api_secret}"
}
topic と Subject を作成
Schema Registry クライアントを生成し、学習用のサブジェクトを1つ用意します。
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
import json
client = SchemaRegistryClient(sr_conf)
subject_name = "compatibility-demo-value"
# 学習前に、同名サブジェクトが存在していたら削除 (開発環境を想定)
try:
client.delete_subject(subject_name, permanent=True)
except:
pass
次に初期スキーマを登録します。これは基本的な Avro スキーマで、name
(string) と favorite_number
(int) のフィールドを持ちます。
initial_schema_str = json.dumps({
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"}
]
})
initial_schema = Schema(initial_schema_str, "AVRO")
initial_id = client.register_schema(subject_name, initial_schema)
print("Initial schema registered with ID:", initial_id)
Confluent 上にて下記の Subject が作成されたことを確認できます。
BACKWARD 互換性
BACKWARD とは
- BACKWARD 互換性: 新しいスキーマでデコード可能なデータは、必ず直前の古いスキーマでエンコードされたデータも読み取れます。つまり、消費者は「新→旧」の順で後退的に互換性を保てます。
- 許される変更例: フィールドの削除 (元フィールドが default 値ありや optional であることが前提)、新たな optional フィールドの追加など。
- 許されない変更例: default 値なしでの新規フィールド追加(古いデータを新スキーマで読む際、値の補完ができない)。
互換性を BACKWARD に設定
from confluent_kafka.schema_registry import SchemaRegistryError
# グローバルあるいは特定サブジェクトの互換性を設定 (ここではサブジェクト単位で設定)
try:
client.set_compatibility(subject_name=subject_name, level="BACKWARD")
print("Compatibility set to BACKWARD.")
except SchemaRegistryError as e:
print("Error setting compatibility:", e)
Confluent 上で確認すると Compatibility mode が Backword となりました。
フィールドの追加(default 値あり)
default 値を持つ新フィールドを追加します。BACKWARD ではこれが許されます。なぜなら、新しいスキーマで古いデータを読む際、default 値で補完できるからです。
backward_compatible_schema_str = json.dumps({
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string", "default": "green"}
]
})
backward_compatible_schema = Schema(backward_compatible_schema_str, "AVRO")
try:
schema_id = client.register_schema(subject_name, backward_compatible_schema)
print("Backward compatible schema registered with ID:", schema_id)
except SchemaRegistryError as e:
print("Error registering backward compatible schema:", e)
Confluent 上で確認すると version 2 となりました。
フィールドの追加(default 値なし)
default 値なしでフィールドを追加してみましょう。これは古いデータを新スキーマで読む際に値が埋められなくなり、BACKWARD 互換性を損ねます。実行すると、SchemaRegistryError
が発生するはずです。この結果、default 値なしでの新フィールド追加は BACKWARD 互換性下で拒否されることがわかります。
backward_incompatible_schema_str = json.dumps({
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string", "default": "green"},
# default なしの新フィールド追加
{"name": "new_field", "type": "string"}
]
})
backward_incompatible_schema = Schema(backward_incompatible_schema_str, "AVRO")
try:
client.register_schema(subject_name, backward_incompatible_schema)
print("This should not print, as the schema is not backward compatible.")
except SchemaRegistryError as e:
print("Error registering backward incompatible schema:", e)
Error registering backward incompatible schema: Schema being registered is incompatible with an earlier schema for subject "compatibility-demo-value",
details: [{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'new_field' at path '/fields/3' in the new schema has no default value and is missing in the old schema', additionalInfo:'new_field'}, {oldSchemaVersion: 2}, {oldSchema: '{"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"},{"name":"favorite_color","type":"string","default":"green"}]}'}, {validateFields: 'true', compatibility: 'BACKWARD'}];
error code: 409 (HTTP status code 409, SR code 409)
FORWARD 互換性
FORWARD とは
- FORWARD 互換性: 新しいスキーマで書かれたデータを古いスキーマで読むことが可能になります。
- 許される変更例: 新たなフィールドを追加(旧スキーマで読み込むときは無視される)、optional なフィールドの削除
- 許されない変更例: 元々必須だったフィールドを削除して default を提供しない、など。
FORWARD では先ほどとは逆の状況を試します。先に FORWARD に設定を変更し、フィールド削除などを行ってみます。
互換性を FORWARD に設定
# サブジェクトの互換性を FORWARD に変更
client.set_compatibility(subject_name=subject_name, level="FORWARD")
print("Compatibility set to FORWARD.")
Confluent 上で確認すると Compatibility mode が Forward となりました。
オプショナルフィールドの削除(許容)
favorite_color
が default 値付きなので、これを削除しても古いスキーマで新データを読む際、値がなくても困らないはずです。FORWARD で問題ないか確認します。
forward_schema_remove_optional_str = json.dumps({
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"}
# favorite_color 削除
]
})
forward_schema_remove_optional = Schema(forward_schema_remove_optional_str, "AVRO")
try:
schema_id = client.register_schema(subject_name, forward_schema_remove_optional)
print("Forward compatible schema (optional field removed) registered with ID:", schema_id)
except SchemaRegistryError as e:
print("Error registering forward compatible schema:", e)
Forward compatible schema (optional field removed) registered with ID: 100001
私が確認した限りでは、エラーにはなりませんでしたが、Subject のバージョンは更新されない動作となりました。これが正常な動作であるかは現時点では確認がとれておりません。
# subject_name = ""
try:
latest_schema_info = client.get_latest_version(subject_name)
print("Latest schema info:", latest_schema_info)
except Exception as e:
print("Error getting latest version:", e)
Latest schema info: RegisteredSchema(schema_id=100002, schema=Schema(schema_str='{"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"},{"name":"favorite_color","type":"string","default":"green"}]}', schema_type='AVRO', references=[], metadata=None, rule_set=None), subject='compatibility-demo-value', version=2)
必須フィールドの削除(非許容)
favorite_number
は必須フィールドでした。これを削除すると、古いスキーマで新しいデータを読むとき、そのフィールドがないことになり解決できません。FORWARD では許されないため、エラーとなるはずです。
forward_schema_remove_required_str = json.dumps({
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{"name": "name", "type": "string"}
# favorite_number 削除 (必須フィールド)
]
})
forward_schema_remove_required = Schema(forward_schema_remove_required_str, "AVRO")
try:
client.register_schema(subject_name, forward_schema_remove_required)
print("This should not print, as removing a required field should fail in FORWARD mode.")
except SchemaRegistryError as e:
print("Error registering forward incompatible schema:", e)
Error registering forward incompatible schema: Schema being registered is incompatible with an earlier schema for subject "compatibility-demo-value",
details: [{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'favorite_number' at path '/fields/1' in the old schema has no default value and is missing in the new schema', additionalInfo:'favorite_number'}, {oldSchemaVersion: 2}, {oldSchema: '{"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int"},{"name":"favorite_color","type":"string","default":"green"}]}'}, {validateFields: 'true', compatibility: 'FORWARD'}];
error code: 409 (HTTP status code 409, SR code 409)
NONE 互換性
NONE とは
- NONE 互換性: 一切の互換性チェックが無効になります。
- あらゆるスキーマ変更が許されます。
- ただし、これを運用で使用すると重大な不整合が発生しうるため、学習目的以外では推奨されません。
互換性を NONE に設定
client.set_compatibility(subject_name=subject_name, level="NONE")
print("Compatibility set to NONE.")
Confluent 上で確認すると Compatibility mode が None となりました。
不互換な変更を強行
たとえばフィールド型を完全に変更するなど、通常は認められない操作を実行します。ここでは favorite_number
を文字列型に変更するという、本来非互換な操作を試します。
none_incompatible_schema_str = json.dumps({
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{"name": "name", "type": "string"},
# favorite_number を int から string に変更 (通常は非互換な操作)
{"name": "favorite_number", "type": "string"}
]
})
none_incompatible_schema = Schema(none_incompatible_schema_str, "AVRO")
try:
schema_id = client.register_schema(subject_name, none_incompatible_schema)
print("Schema registered despite incompatibility (NONE mode): ID =", schema_id)
except SchemaRegistryError as e:
print("Error registering schema:", e)
Schema registered despite incompatibility (NONE mode): ID = 100003
Confluent 上で確認すると version 3 となりました。
NONE モードでは、通常エラーとなる変更も許可されるため、ここでは成功するはずです。
事後処理
Confluent 上で作成した Environment を削除します。
まとめ
-
BACKWARD モード: 新スキーマで旧データが問題なく読めるような変更のみが許可されます。
- default 値なしの新フィールド追加は不可。
- optional なフィールド追加や元々 default があるフィールドの削除は可。
-
FORWARD モード: 新データを旧スキーマで読み込める変更が許可されます。
- 新しい optional フィールドの追加は問題ありません(旧スキーマでは無視)。
- 必須フィールド削除は非互換。
-
NONE モード: 互換性チェックがなく、あらゆる変更が許容されます。
- データ整合性を破壊しうるため、本番環境では注意が必要。
今回の実行例では、実際にスキーマを登録しようとした際に SchemaRegistryError
が出るかどうかで何が許可され、何が許可されないかを確認しました。これらを踏まえることで、実際のスキーマ進化の際にどの互換性モードを選択すべきか理解する助けになります。