0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Distributed computing (Apache Spark, Hadoop, Kafka, ...)Advent Calendar 2024

Day 18

Python × Confluent Schema Registry:Confluent Schema Registry 入門

Last updated at Posted at 2024-12-18

概要

Confluent Schema Registry は、Apache Kafka 上で流通するデータのスキーマ(Avro、JSON Schema、Protobuf など)を一元的に管理し、スキーマ進化時の互換性チェックやバージョン管理を容易にするコンポーネントです。Confluent 社によって開発され、Confluent Community License の下で配布されています。これにより、スキーマを独立した「契約」として扱うことで、プロデューサーやコンシューマー間の事前合意や明示的なスキーマ共有の手間を軽減し、システム全体の柔軟性や保守性を向上させます。

また、Python から操作する場合は、confluent-kafka ライブラリが提供する SchemaRegistryClient クラスを通じて、スキーマの登録・取得・互換性設定・削除など、多彩な機能をプログラムで実行可能です。

本記事の文章とコードの大部分は、 ChatGPT o1 pro mode で記述しています。ライブラリに関するドキュメント内容を提供してサンプルコードの作成からはじめました。それを微修正して、本記事になりました。ChatGPT Pro はおすすめです。

Schema Registry が学習するために SchemaRegistryClient インスタンスに関して学習するコンテンツを作成して。

{`confluent-kafka` ライブラリの Schema Registry クライアントクラスのドキュメント内容}

image.png

image.png

image.png

Confluent Schema Registry とは

Confluent Schema Registry 概要

Confluent Schema Registry は、Apache Kafka 環境におけるスキーマ管理を行うためのコンポーネントです。これにより Kafka メッセージのスキーマ(Avro、JSON Schema、Protobufなど)を集中管理し、スキーマ進化時の互換性担保やバージョン管理を容易にします。

image.png

引用元:Schema Registry for Confluent Platform | Confluent Documentation

Confluent Schema Registry は以下のような特徴を持ちます。

  • 集中管理: すべてのスキーマを一元的に管理し、クライアントからのリクエストに応じてスキーマを提供します。
  • 互換性ポリシーの適用: 新規バージョンのスキーマ登録時に、後方互換性や前方互換性などのポリシーを自動的にチェックし、問題のある変更を防ぎます。
  • 複数のスキーマ形式対応: Avro、JSON Schema、Protobuf など、異なるスキーマ形式を統一的に扱えるため、異なるアプリケーション間でのスキーマ管理が容易になります。

Confluent 上ではData contractとして分類されており、Schema Registry の Subject を確認することできます。

image.png

image.png

Confluent Community License の下で配布されています。

image.png

引用元:confluentinc/schema-registry: Confluent Schema Registry for Kafka

Python で Confluent Schema Registry を操作する方法

Python で Confluent Schema Registry を操作する際には、confluent-kafka ライブラリに含まれる Schema Registry クライアントクラス (SchemaRegistryClient) を用います。

image.png

引用元:confluent_kafka API — confluent-kafka 2.6.0 documentation

このライブラリを使うことで、Python アプリケーションから次のような操作が可能です。

  • Schema Registry への接続設定(URL、認証情報、SSL 設定など)
  • スキーマの登録、取得、バージョン管理、削除
  • 互換性レベルの確認・変更、および新スキーマの互換性テスト
  • エラーハンドリング(未登録のスキーマやサブジェクトへのアクセス、互換性違反など)

SchemaRegistryClient を利用するためには、Schema Registry の接続情報や認証情報、SSL 設定などを dict 形式のコンフィグにまとめて渡します。

設定キー 説明
url * str Schema Registry の URL (必須)
ssl.ca.location str CA 証明書ファイルへのパス
ssl.key.location str クライアント秘密鍵(PEM)へのパス
ssl.certificate.location str クライアント公開鍵(PEM)へのパス
basic.auth.user.info str username:password 形式の認証情報

SchemaRegistryClient はスキーマのライフサイクルを管理するための様々なメソッドを提供しています。主なものを以下に示します。

  • スキーマ登録・取得
    • register_schema(subject_name, schema, normalize_schemas=False): サブジェクトにスキーマを登録し、スキーマIDを返します。
    • get_schema(schema_id): スキーマIDに対応するスキーマを取得します。
    • lookup_schema(subject_name, schema, normalize_schemas=False): 指定のスキーマが既に登録済みかを確認します。
  • サブジェクト管理
    • get_subjects(): 登録されている全サブジェクト一覧を取得します。
    • delete_subject(subject_name, permanent=False): サブジェクトを削除します。(通常は開発・テスト用)
  • バージョン管理
    • get_latest_version(subject_name): 最新バージョンのスキーマ情報を取得します。
    • get_version(subject_name, version): 指定バージョンのスキーマ情報を取得します。
    • get_versions(subject_name): サブジェクト下の全バージョン番号を取得します。
    • delete_version(subject_name, version): 指定バージョンを削除します。
  • 互換性管理
    • set_compatibility(subject_name=None, level=None): グローバルまたは対象サブジェクトの互換性レベルを設定します。
    • get_compatibility(subject_name=None): 現在の互換性レベルを取得します。
    • test_compatibility(subject_name, schema, version='latest'): 提案スキーマが特定バージョンと互換性があるかをチェックします。

他社の Schema Registry との差異

Azure や AWS などのクラウドベンダーも、独自の Schema Registry 機能を提供しています。しかし、これらは Confluent Schema Registry とは異なるサービスのようです。

  • Azure Schema Registry: Azure Event Hubs 環境に最適化されており、Microsoft のエコシステムと緊密に統合されています。
  • AWS Glue Schema Registry: AWS 環境(特に Amazon MSK や他のデータストリーミングサービス)でのシームレスなスキーマ管理を目的として Amazon が独自開発しています。

それぞれが GitHub にて公開されています。

Confluent Schema Registry の実践

事前準備

Confluent Cloud 上に新規の envrionment を作成

image.png

Confluent Cloud 上にクラスターを作成

image.png

Schema Registry の認証情報を取得

Envrionment の画面にて右下にあるStream Governance APIにある+ Add keyから認証情報を取得します。

image.png

image.png

image.png

Python 上での事前準備

confluent-kafka のインストール

Google Colab 上にノートブックを作成して、下記のコードを実行します。

!pip install confluent-kafka -q

image.png

SchemaRegistryClient のインスタンス生成

# Schema Registry の接続情報をセット
sr_url = "<schema.registry.url>"
sr_api_key = "<SR_API_KEY>"
sr_api_secret = "<SR_API_SECRET>"
sr_config = {
    'url': sr_url,
    'basic.auth.user.info': f"{sr_api_key}:{sr_api_secret}",
}

from confluent_kafka.schema_registry import SchemaRegistryClient

client = SchemaRegistryClient(sr_config)
print(client)

image.png

スキーマの登録と検証

スキーマの登録と検証

from confluent_kafka.schema_registry import Schema

avro_str = """
{
  "type": "record",
  "name": "User",
  "namespace": "example.avro",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_number", "type": "int", "default": 42}
  ]
}
"""

# Schemaオブジェクトを作成 (Avro Schemaの場合は"AVRO"を指定)
schema = Schema(avro_str, "AVRO")
schema
Schema(schema_str='\n{\n  "type": "record",\n  "name": "User",\n  "namespace": "example.avro",\n  "fields": [\n    {"name": "name", "type": "string"},\n    {"name": "favorite_number", "type": "int", "default": 42}\n  ]\n}\n', schema_type='AVRO', references=[], metadata=None, rule_set=None)

image.png

スキーマの登録

subject_name = "sr-test-avro-value"

try:
    schema_id = client.register_schema(subject_name, schema)
    print("Registered schema with ID:", schema_id)
except Exception as e:
    print("Error registering schema:", e)
Registered schema with ID: 100001

image.png

スキーマ取得と確認

try:
    fetched_schema = client.get_schema(schema_id)
    print("Fetched schema:", fetched_schema.schema_str)
except Exception as e:
    print("Error fetching schema:", e)
Fetched schema: {"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int","default":42}]}

image.png

スキーマ互換性テスト

スキーマの変更がないため、Trueとリターンされます。

try:
    compatible = client.test_compatibility(subject_name, schema, version='latest')
    print("Is the schema compatible with the latest version?:", compatible)
except Exception as e:
    print("Error testing compatibility:", e)
Is the schema compatible with the latest version?: True

image.png

Compatibility が BACKWARD であるため、カラムの追加(add_col列の追加)が許容されないため、Falseがリターンされます。

error_avro_str = """
{
  "type": "record",
  "name": "User",
  "namespace": "example.avro",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_number", "type": "int", "default": 42},
    {"name": "add_col", "type": "string"}
  ]
}
"""

# Schemaオブジェクトを作成 (Avro Schemaの場合は"AVRO"を指定)
error_schema = Schema(error_avro_str, "AVRO")
try:
    compatible = client.test_compatibility(subject_name, error_schema, version='latest')
    print("Is the schema compatible with the latest version?:", compatible)
except Exception as e:
    print("Error testing compatibility:", e)
Is the schema compatible with the latest version?: False

image.png

サブジェクト・バージョン管理操作

全サブジェクト一覧取得

try:
    subjects = client.get_subjects()
    print("All subjects:", subjects)
except Exception as e:
    print("Error getting subjects:", e)
All subjects: ['sr-test-avro-value']

image.png

指定サブジェクトのバージョン一覧取得

try:
    versions = client.get_versions(subject_name)
    print(f"Versions under {subject_name}:", versions)
except Exception as e:
    print("Error getting versions:", e)
Versions under sr-test-avro-value: [1]

image.png

最新バージョン情報取得

try:
    latest_schema_info = client.get_latest_version(subject_name)
    print("Latest schema info:", latest_schema_info)
except Exception as e:
    print("Error getting latest version:", e)
Latest schema info: RegisteredSchema(schema_id=100001, schema=Schema(schema_str='{"type":"record","name":"User","namespace":"example.avro","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":"int","default":42}]}', schema_type='AVRO', references=[], metadata=None, rule_set=None), subject='sr-test-avro-value', version=1)

image.png

スキーマ互換性レベルの確認

# 現在のグローバル互換性レベル取得
try:
    current_compat = client.get_compatibility()
    print("Current global compatibility level:", current_compat)
except Exception as e:
    print("Error getting compatibility:", e)
Current global compatibility level: BACKWARD

image.png

スキーマ互換性レベルの変更

# グローバル互換性レベルを 'FULL' に変更
try:
    new_level = client.set_compatibility(level="FULL")
    print("New global compatibility level:", new_level)
except Exception as e:
    print("Error setting compatibility:", e)
New global compatibility level: {'compatibility': 'FULL'}

image.png

エラーハンドリング

SchemaRegistryError

よく発生するエラーには、SchemaRegistryError があります。

image.png

下記のコードでは存在しない subject のバージョンを取得しようとしてSchemaRegistryError を発生させています。

from confluent_kafka.schema_registry import SchemaRegistryError

try:
    # 存在しないサブジェクトを取得してみる
    non_existent_subject = "non-existent-subject"
    non_existent_versions = client.get_versions(non_existent_subject)
except SchemaRegistryError as e:
    print(f"SchemaRegistryError occurred: {e}")
except Exception as e:
    print("Other Error:", e)
SchemaRegistryError occurred: Subject 'non-existent-subject' not found. (HTTP status code 404, SR code 40401)

image.png

事後処理

Confluent 上で environment を削除

image.png

まとめ

Confluent Schema Registry は、Kafka ベースのデータパイプラインにおいてスキーマ管理を標準化・自動化する重要なツールです。これを活用することで、スキーマの変更による障害を回避しやすくなり、堅牢なストリーム処理基盤を構築できます。他社の Schema Registry(Azure や AWS のサービス)とはライセンスや統合度、内部構造などが異なりますが、共通してスキーマ管理の複雑性を軽減する役割を果たします。多様な環境に合わせたスキーマ管理戦略を立てる際、Confluent Schema Registry は有力な選択肢の一つと言えます。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?