0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Confluent 環境における`auto.register.schemas` の有効・無効によるスキーマ登録挙動の比較

0
Last updated at Posted at 2024-12-19

概要

Confluent 環境では、KafkaSchema Registry は緊密に統合されており、新たにメッセージを Produce した際、デフォルトでスキーマが登録されます。このスキーマ登録の挙動は、シリアライズ時に利用するメソッド内部の auto.register.schemas プロパティによって制御されます。この記事では、この挙動を実際に検証します。

本記事は、以下のシリーズの一部です。

image.png

引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita

auto.register.schemas プロパティについて

AvroJSONProtobuf のシリアライズクラスでは、conf 引数で auto.register.schemas プロパティを指定できます。これらのクラスではデフォルト値が True となっており、有効な場合はスキーマが自動的にサブジェクトとして Schema Registry に登録されます。

image.png

If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy).

Defaults to True.

引用元:confluent_kafka API — confluent-kafka 2.6.0 documentation

image.png

If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy).

Defaults to True.

Raises SchemaRegistryError if the schema was not registered against the subject, or could not be successfully registered.

引用元:confluent_kafka API — confluent-kafka 2.6.0 documentation

If True, automatically register the configured schema with Confluent Schema Registry if it has not previously been associated with the relevant subject (determined via subject.name.strategy).

Defaults to True.

Raises SchemaRegistryError if the schema was not registered against the subject, or could not be successfully registered.

image.png

引用元:confluent_kafka API — confluent-kafka 2.6.0 documentation

検証コードと実行結果

Confluent 環境での事前準備

1. Confluent 上で auto-register-schema-test という Topic を作成

image.png

Topic を作成しただけでは、Schema Registry にサブジェクトは作成されません。

image.png

2. クラスター (Kafka) と Schema Registry への認証情報を取得

image.png

image.png

Python での事前準備

1. ライブラリをインストール

!pip install confluent-kafka confluent-kafka[avro] -q

image.png

2. ライブラリをインポート

from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.avro import AvroProducer, loads
from confluent_kafka.schema_registry import Schema, SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

image.png

3. 認証情報をセット

# Kafka の接続情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "HZCZN2ELLMHZYUO4"
sasl_password = "l0rJLt3uRII8yDfHhh7JBFpH1p1Ixx8siW2I1j6RuTRUp8gZ5XvVBC3+E50orwHl"
kafka_conf = {
    'bootstrap.servers': bootstrap_servers,
    'sasl.username':     sasl_username,
    'sasl.password':     sasl_password,
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
}

# Schema Registry 接続情報をセット
sr_url = "https://psrc-lq2dm.us-east-2.aws.confluent.cloud"
sr_api_key = "XAYUIVXTMGTDKCXQ"
sr_api_secret = "d8ODesWpDhJ0hbOf1hFtBOYncSx+Qeemi+DdYT6BYSFv/+eonq3wPYt5BAAoiHOZ"
sr_conf = {
    'url': sr_url,
    'basic.auth.user.info': f"{sr_api_key}:{sr_api_secret}"
}

image.png

4. スキーマを定義

schema_str = """
{
  "type": "record",
  "name": "User",
  "namespace": "example.avro",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}
"""

image.png

auto.register.schemasfalse の場合の動作確認

1. Produce 実行

ser_conf = {
    'auto.register.schemas': False,  # スキーマ自動登録無効
}

topic = "auto-register-schema-test"
record = {"name": "Alice", "age": 30}
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_serializer = AvroSerializer(
    schema_registry_client,
    schema_str,
    conf=ser_conf,
)
producer = Producer(kafka_conf)
serialized_value = avro_serializer(
    record,
    SerializationContext(topic, MessageField.VALUE)
)
producer.produce(
    topic=topic,
    key=str(record["name"]),
    value=serialized_value,
)
producer.flush()
print("書き込み成功")
SchemaRegistryError: Subject 'auto-register-schema-test-value' not found. (HTTP status code 404, SR code 40401)

image.png

auto.register.schemastrue の場合の動作確認

1. Produce 実行

ser_conf = {
    'auto.register.schemas': True,  # スキーマ自動登録有効
}

topic = "auto-register-schema-test"
record = {"name": "Alice", "age": 30}
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_serializer = AvroSerializer(
    schema_registry_client,
    schema_str,
    conf=ser_conf,
)
producer = Producer(kafka_conf)
serialized_value = avro_serializer(
    record,
    SerializationContext(topic, MessageField.VALUE)
)
producer.produce(
    topic=topic,
    key=str(record["name"]),
    value=serialized_value,
)
producer.flush()
print("書き込み成功")

image.png

事後処理

1. Topic 削除

image.png

2. Schema Registry のサブジェクト削除

image.png

まとめ

この記事では、Confluent 環境で KafkaSchema Registry が統合されている状況下で、auto.register.schemas プロパティの挙動を検証しました。auto.register.schemasTrue にすると、新たなメッセージを Produce する際に未登録のスキーマが自動的にサブジェクトとして Schema Registry に登録され、スムーズなスキーマ管理が可能になります。一方、False に設定した場合は、スキーマが登録されていないとエラーが発生し、明示的なスキーマ登録が必要になります。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?