概要
Confluent Schema Registry は、Apache Kafka 上で流通するデータのスキーマ(Avro、JSON Schema、Protobuf など)を一元的に管理し、スキーマ進化時の互換性チェックやバージョン管理を容易にするコンポーネントです。Confluent 社によって開発され、Confluent Community License の下で配布されています。これにより、スキーマを独立した「契約」として扱うことで、プロデューサーやコンシューマー間の事前合意や明示的なスキーマ共有の手間を軽減し、システム全体の柔軟性や保守性を向上させます。
また、Python から操作する場合は、confluent-kafka ライブラリが提供する SchemaRegistryClient クラスを通じて、スキーマの登録・取得・互換性設定・削除など、多彩な機能をプログラムで実行可能です。
本記事の文章とコードの大部分は、 ChatGPT o1 pro mode で記述しています。ライブラリに関するドキュメント内容を提供してサンプルコードの作成からはじめました。それを微修正して、本記事になりました。ChatGPT Pro はおすすめです。
Schema Registry が学習するために SchemaRegistryClient インスタンスに関して学習するコンテンツを作成して。
{`confluent-kafka` ライブラリの Schema Registry クライアントクラスのドキュメント内容}
本記事は、以下のシリーズの一部です。
引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita
Confluent Schema Registry とは
Confluent Schema Registry 概要
Confluent Schema Registry は、Apache Kafka 環境におけるスキーマ管理を行うためのコンポーネントです。これにより Kafka メッセージのスキーマ(Avro、JSON Schema、Protobufなど)を集中管理し、スキーマ進化時の互換性担保やバージョン管理を容易にします。
引用元:Schema Registry for Confluent Platform | Confluent Documentation
Confluent Schema Registry は以下のような特徴を持ちます。
- 集中管理: すべてのスキーマを一元的に管理し、クライアントからのリクエストに応じてスキーマを提供します。
- 互換性ポリシーの適用: 新規バージョンのスキーマ登録時に、後方互換性や前方互換性などのポリシーを自動的にチェックし、問題のある変更を防ぎます。
- 複数のスキーマ形式対応: Avro、JSON Schema、Protobuf など、異なるスキーマ形式を統一的に扱えるため、異なるアプリケーション間でのスキーマ管理が容易になります。
Confluent 上ではData contractとして分類されており、Schema Registry の Subject を確認することできます。
Confluent Community License の下で配布されています。
引用元:confluentinc/schema-registry: Confluent Schema Registry for Kafka
Python で Confluent Schema Registry を操作する方法
Python で Confluent Schema Registry を操作する際には、confluent-kafka ライブラリに含まれる Schema Registry クライアントクラス (SchemaRegistryClient) を用います。
引用元:confluent_kafka API — confluent-kafka 2.6.0 documentation
このライブラリを使うことで、Python アプリケーションから次のような操作が可能です。
- Schema Registry への接続設定(URL、認証情報、SSL 設定など)
- スキーマの登録、取得、バージョン管理、削除
- 互換性レベルの確認・変更、および新スキーマの互換性テスト
- エラーハンドリング(未登録のスキーマやサブジェクトへのアクセス、互換性違反など)
SchemaRegistryClient を利用するためには、Schema Registry の接続情報や認証情報、SSL 設定などを dict 形式のコンフィグにまとめて渡します。
| 設定キー | 型 | 説明 |
|---|---|---|
url * |
str | Schema Registry の URL (必須) |
ssl.ca.location |
str | CA 証明書ファイルへのパス |
ssl.key.location |
str | クライアント秘密鍵(PEM)へのパス |
ssl.certificate.location |
str | クライアント公開鍵(PEM)へのパス |
basic.auth.user.info |
str |
username:password 形式の認証情報 |
SchemaRegistryClient はスキーマのライフサイクルを管理するための様々なメソッドを提供しています。主なものを以下に示します。
- スキーマ登録・取得
-
register_schema(subject_name, schema, normalize_schemas=False): サブジェクトにスキーマを登録し、スキーマIDを返します。 -
get_schema(schema_id): スキーマIDに対応するスキーマを取得します。 -
lookup_schema(subject_name, schema, normalize_schemas=False): 指定のスキーマが既に登録済みかを確認します。
-
- サブジェクト管理
-
get_subjects(): 登録されている全サブジェクト一覧を取得します。 -
delete_subject(subject_name, permanent=False): サブジェクトを削除します。(通常は開発・テスト用)
-
- バージョン管理
-
get_latest_version(subject_name): 最新バージョンのスキーマ情報を取得します。 -
get_version(subject_name, version): 指定バージョンのスキーマ情報を取得します。 -
get_versions(subject_name): サブジェクト下の全バージョン番号を取得します。 -
delete_version(subject_name, version): 指定バージョンを削除します。
-
- 互換性管理
-
set_compatibility(subject_name=None, level=None): グローバルまたは対象サブジェクトの互換性レベルを設定します。 -
get_compatibility(subject_name=None): 現在の互換性レベルを取得します。 -
test_compatibility(subject_name, schema, version='latest'): 提案スキーマが特定バージョンと互換性があるかをチェックします。
-
他社の Schema Registry との差異
Azure や AWS などのクラウドベンダーも、独自の Schema Registry 機能を提供しています。しかし、これらは Confluent Schema Registry とは異なるサービスのようです。
- Azure Schema Registry: Azure Event Hubs 環境に最適化されており、Microsoft のエコシステムと緊密に統合されています。
- AWS Glue Schema Registry: AWS 環境(特に Amazon MSK や他のデータストリーミングサービス)でのシームレスなスキーマ管理を目的として Amazon が独自開発しています。
それぞれが GitHub にて公開されています。
Confluent Schema Registry の実践
事前準備
Confluent Cloud 上に新規の envrionment を作成
Confluent Cloud 上にクラスターを作成
Schema Registry の認証情報を取得
Envrionment の画面にて右下にあるStream Governance APIにある+ Add keyから認証情報を取得します。
Python 上での事前準備
confluent-kafka のインストール
Google Colab 上にノートブックを作成して、下記のコードを実行します。
!pip install confluent-kafka -q
SchemaRegistryClient のインスタンス生成
# Schema Registry の接続情報をセット
sr_url = "<schema.registry.url>"
sr_api_key = "<SR_API_KEY>"
sr_api_secret = "<SR_API_SECRET>"
sr_config = {
'url': sr_url,
'basic.auth.user.info': f"{sr_api_key}:{sr_api_secret}",
}
from confluent_kafka.schema_registry import SchemaRegistryClient
client = SchemaRegistryClient(sr_config)
print(client)
スキーマの登録と検証
スキーマの登録と検証
from confluent_kafka.schema_registry import Schema
avro_str = """
{
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int", "default": 42}
]
}
"""
# Schemaオブジェクトを作成 (Avro Schemaの場合は"AVRO"を指定)
schema = Schema(avro_str, "AVRO")
schema
Schema(schema_str='\n{\n "type": "record",\n "name": "User",\n "namespace": "example.avro",\n "fields": [\n {"name": "name", "type": "string"},\n {"name": "favorite_number", "type": "int", "default": 42}\n ]\n}\n', schema_type='AVRO', references=[], metadata=None, rule_set=None)
スキーマの登録
subject_name = "sr-test-avro-value"
try:
schema_id = client.register_schema(subject_name, schema)
print("Registered schema with ID:", schema_id)
except Exception as e:
print("Error registering schema:", e)
Registered schema with ID: 100001
スキーマ取得と確認
try:
fetched_schema = client.get_schema(schema_id)
print("Fetched schema:", fetched_schema.schema_str)
except Exception as e:
print("Error fetching schema:", e)
Fetched schema: {"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int","default":42}]}
スキーマ互換性テスト
スキーマの変更がないため、Trueとリターンされます。
try:
compatible = client.test_compatibility(subject_name, schema, version='latest')
print("Is the schema compatible with the latest version?:", compatible)
except Exception as e:
print("Error testing compatibility:", e)
Is the schema compatible with the latest version?: True
Compatibility が BACKWARD であるため、カラムの追加(add_col列の追加)が許容されないため、Falseがリターンされます。
error_avro_str = """
{
"type": "record",
"name": "User",
"namespace": "example.avro",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int", "default": 42},
{"name": "add_col", "type": "string"}
]
}
"""
# Schemaオブジェクトを作成 (Avro Schemaの場合は"AVRO"を指定)
error_schema = Schema(error_avro_str, "AVRO")
try:
compatible = client.test_compatibility(subject_name, error_schema, version='latest')
print("Is the schema compatible with the latest version?:", compatible)
except Exception as e:
print("Error testing compatibility:", e)
Is the schema compatible with the latest version?: False
サブジェクト・バージョン管理操作
全サブジェクト一覧取得
try:
subjects = client.get_subjects()
print("All subjects:", subjects)
except Exception as e:
print("Error getting subjects:", e)
All subjects: ['sr-test-avro-value']
指定サブジェクトのバージョン一覧取得
try:
versions = client.get_versions(subject_name)
print(f"Versions under {subject_name}:", versions)
except Exception as e:
print("Error getting versions:", e)
Versions under sr-test-avro-value: [1]
最新バージョン情報取得
try:
latest_schema_info = client.get_latest_version(subject_name)
print("Latest schema info:", latest_schema_info)
except Exception as e:
print("Error getting latest version:", e)
Latest schema info: RegisteredSchema(schema_id=100001, schema=Schema(schema_str='{"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int","default":42}]}', schema_type='AVRO', references=[], metadata=None, rule_set=None), subject='sr-test-avro-value', version=1)
スキーマ互換性レベルの確認
# 現在のグローバル互換性レベル取得
try:
current_compat = client.get_compatibility()
print("Current global compatibility level:", current_compat)
except Exception as e:
print("Error getting compatibility:", e)
Current global compatibility level: BACKWARD
スキーマ互換性レベルの変更
# グローバル互換性レベルを 'FULL' に変更
try:
new_level = client.set_compatibility(level="FULL")
print("New global compatibility level:", new_level)
except Exception as e:
print("Error setting compatibility:", e)
New global compatibility level: {'compatibility': 'FULL'}
エラーハンドリング
SchemaRegistryError
よく発生するエラーには、SchemaRegistryError があります。
下記のコードでは存在しない subject のバージョンを取得しようとしてSchemaRegistryError を発生させています。
from confluent_kafka.schema_registry import SchemaRegistryError
try:
# 存在しないサブジェクトを取得してみる
non_existent_subject = "non-existent-subject"
non_existent_versions = client.get_versions(non_existent_subject)
except SchemaRegistryError as e:
print(f"SchemaRegistryError occurred: {e}")
except Exception as e:
print("Other Error:", e)
SchemaRegistryError occurred: Subject 'non-existent-subject' not found. (HTTP status code 404, SR code 40401)
事後処理
Confluent 上で environment を削除
まとめ
Confluent Schema Registry は、Kafka ベースのデータパイプラインにおいてスキーマ管理を標準化・自動化する重要なツールです。これを活用することで、スキーマの変更による障害を回避しやすくなり、堅牢なストリーム処理基盤を構築できます。他社の Schema Registry(Azure や AWS のサービス)とはライセンスや統合度、内部構造などが異なりますが、共通してスキーマ管理の複雑性を軽減する役割を果たします。多様な環境に合わせたスキーマ管理戦略を立てる際、Confluent Schema Registry は有力な選択肢の一つと言えます。




























