0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

manabianAdvent Calendar 2024

Day 13

Confluent にて Kafka メッセージ Key を Schema Registry で管理するメリットと実践方法

Last updated at Posted at 2024-12-22

概要

Confluent 上で Key をシリアライズ/デシリアライズ(SerDes)する方法を検証してみました。Apache Kafka ではメッセージの Key を単純に文字列として運用するケースもありますが、Key をシリイズし、Schema Registry で管理すると以下のような多くのメリットがあります。

  1. 複数列に分割した Key を使用する場合でも、パーティション制御の一貫性を維持できる
  2. Key のデータ構造をスキーマとして管理し、互換性やバージョン管理を実施できる
  3. Consumer 側で自動デシリアライズできるため、実装が簡単になる

今回は Key を Avro スキーマで管理し、Ser/Des を行う手順を紹介します。

本記事は、以下のシリーズの一部です。

image.png

引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita

事前準備

Confluent と Schema Registry への接続情報をセット

# Kafka の接続情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "B5NJBZ22ZKEJOG22"
sasl_password = "7IMvcMC52lAiGPixGB3AyekX0Tk2nNXqIzufYRoxIwdVw8nJAOFAjngKMz2W4Cg0"
kafka_conf = {
    "bootstrap.servers": bootstrap_servers,
    "sasl.username":     sasl_username,
    "sasl.password":     sasl_password,
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms":   "PLAIN",
}

# Schema Registry の接続情報
sr_url = "https://psrc-lq2dm.us-east-2.aws.confluent.cloud"
sr_api_key = "3CEWV5JNSV6IEKIV"
sr_api_secret = "Eo+9lTX117MrFtS6n09tpdpK7HcXZ6aXoF2d+mNnJYE/4Xtgiyy5ybennM5dCNdP"
sr_conf = {
    "url": sr_url,
    "basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}"
}

検証に利用する Topic を作成

事前準備として、Confluent の AdminClient を利用して複数のトピックを作成しておきます。以下のコードでは、存在しないトピックは新規に作成し、既に存在していればエラーを出力する実装です。

from confluent_kafka.admin import AdminClient, NewTopic

def create_topics_example(kafka_conf, topics_to_create):
    # AdminClientの設定
    admin_client = AdminClient(kafka_conf)
    
    # トピック作成リクエスト(NewTopic オブジェクト)のリストを作る
    new_topics = []
    for topic_name in topics_to_create:
        # 必要に応じて num_partitions や replication_factor を設定
        new_topic = NewTopic(topic=topic_name)
        new_topics.append(new_topic)
    
    # AdminClient の create_topics を呼び出すと、戻り値は {topic名: Future} の辞書
    futures = admin_client.create_topics(new_topics)
    
    # Future の結果をチェックして、作成成功 / 失敗を判定
    for topic, f in futures.items():
        try:
            f.result()  # 例外が起きなければ成功
            print(f"Topic '{topic}' created successfully.")
        except Exception as e:
            print(f"Failed to create topic '{topic}': {e}")
topics_to_create = [
    "json_schema_01",
    "json_schema_02",
    "json_schema_03",
    "json_schema_04",
    "json_schema_05",
    "json_schema_06",
    "json_schema_07",
    "json_schema_08",
]

create_topics_example(
    kafka_conf,
    topics_to_create,
)

image.png

Key に対する Ser/Des の実施

Consumer 側での Avro デシリアライズ方法

以下では、Key 部分を Avro でシリアライズして送信します。

from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

# SchemaRegistryClient と AvroSerializer を初期化
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_serializer = AvroSerializer(
    schema_registry_client,
    key_schema_str,
)

# Kafka Producer を初期化
producer = Producer(kafka_conf)

# Avro でシリアライズ
serialized_key_value = avro_serializer(
    customer_key_data,
    SerializationContext(topic_name, MessageField.KEY)
)

# Kafka にメッセージを送信
producer.produce(
    topic=topic_name,
    key=serialized_key_value,
    value=customer_value_data,
)

# 送信をフラッシュ(ブロッキングして完了を待つ)
producer.flush()
print("メッセージ書き込み成功")

image.png

Confluent 上でデータを確認すると下記のようになっていました。

image.png

Consumer 側での Avro デシリアライズ方法

次に、Consumer では受信した Key を Avro でデシリアライズしてみます。

from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer


# ここでも同じスキーマを使って AvroDeserializer を作成
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_deserializer = AvroDeserializer(schema_registry_client)

consumer_conf = {
    'group.id': 'avro_consumer_group_01',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
}
consumer_conf.update(kafka_conf)

# Kafka Consumer の設定
consumer = Consumer(consumer_conf)

# トピックを購読
consumer.subscribe([topic_name])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            # タイムアウト時やメッセージなしの場合
            continue
        if msg.error():
            print(f"Consumer エラー: {msg.error()}")
            continue
        
        # --- キーのデシリアライズ部分 ---
        # ここで msg.key() はバイナリとして受け取っているので、AvroDeserializer で復元
        deserialized_key = avro_deserializer(
            msg.key(),
            SerializationContext(topic_name, MessageField.KEY)
        )
        
        # 値は今回は単純な文字列(未シリアライズ)なので、decode するなどで取り出す
        # (Producerでそのまま string を渡している場合)
        deserialized_value = msg.value().decode('utf-8') if msg.value() else None
        
        print(f"キー = {deserialized_key}, 値 = {deserialized_value}")

except KeyboardInterrupt:
    print("Consumer を停止します...")
finally:
    consumer.close()

image.png

以下に、「まとめ」の章を追記した、全体の文章を改善したバージョンを提示します。


概要

Confluent 上で Key をシリアライズ/デシリアライズ(SerDes)する方法を検証してみました。Apache Kafka ではメッセージの Key を単純に文字列として運用するケースもありますが、Key をシリアライズし、Schema Registry で管理すると以下のような多くのメリットがあります。

  1. 複数列に分割した Key を使用する場合でも、パーティション制御の一貫性を維持できる
  2. Key のデータ構造をスキーマとして管理し、互換性やバージョン管理を実施できる
  3. Consumer 側で自動デシリアライズできるため、実装が簡単になる

今回は Key を Avro スキーマで管理し、SerDes を行う手順を紹介します。


事前準備

Confluent と Schema Registry への接続情報をセット

# Kafka の接続情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "B5NJBZ22ZKEJOG22"
sasl_password = "7IMvcMC52lAiGPixGB3AyekX0Tk2nNXqIzufYRoxIwdVw8nJAOFAjngKMz2W4Cg0"
kafka_conf = {
    "bootstrap.servers": bootstrap_servers,
    "sasl.username":     sasl_username,
    "sasl.password":     sasl_password,
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms":   "PLAIN",
}

# Schema Registry の接続情報
sr_url = "https://psrc-lq2dm.us-east-2.aws.confluent.cloud"
sr_api_key = "3CEWV5JNSV6IEKIV"
sr_api_secret = "Eo+9lTX117MrFtS6n09tpdpK7HcXZ6aXoF2d+mNnJYE/4Xtgiyy5ybennM5dCNdP"
sr_conf = {
    "url": sr_url,
    "basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}"
}

検証用トピックの作成

事前準備として、Confluent の AdminClient を利用して複数のトピックを作成しておきます。以下のコードでは、存在しないトピックは新規に作成し、既に存在していればエラーを出力する実装です。

from confluent_kafka.admin import AdminClient, NewTopic

def create_topics_example(kafka_conf, topics_to_create):
    # AdminClientの設定
    admin_client = AdminClient(kafka_conf)
    
    # トピック作成リクエスト(NewTopic オブジェクト)のリストを作成
    new_topics = []
    for topic_name in topics_to_create:
        # 必要に応じて num_partitions や replication_factor を設定
        new_topic = NewTopic(topic=topic_name)
        new_topics.append(new_topic)
    
    # AdminClient の create_topics を呼び出し、戻り値は {topic名: Future} の辞書
    futures = admin_client.create_topics(new_topics)
    
    # Future の結果をチェックして、作成成功 / 失敗を判定
    for topic, f in futures.items():
        try:
            f.result()  # 例外が起きなければ成功
            print(f"Topic '{topic}' created successfully.")
        except Exception as e:
            print(f"Failed to create topic '{topic}': {e}")
topics_to_create = [
    "json_schema_01",
    "json_schema_02",
    "json_schema_03",
    "json_schema_04",
    "json_schema_05",
    "json_schema_06",
    "json_schema_07",
    "json_schema_08",
]

create_topics_example(
    kafka_conf,
    topics_to_create,
)

image.png


Key に対する Ser/Des の実施

Producer 側での Avro シリアライズ方法

以下の例では、Key 部分を Avro でシリアライズして送信します。

from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

# SchemaRegistryClient と AvroSerializer を初期化
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_serializer = AvroSerializer(
    schema_registry_client,
    key_schema_str,  # 事前に定義したキーの Avro スキーマ文字列
)

# Kafka Producer を初期化
producer = Producer(kafka_conf)

# Avro で Key をシリアライズ
serialized_key_value = avro_serializer(
    customer_key_data,  # 送信したい Key データ
    SerializationContext(topic_name, MessageField.KEY)
)

# Kafka にメッセージを送信
producer.produce(
    topic=topic_name,
    key=serialized_key_value,
    value=customer_value_data,  # 今回は文字列などを単純に渡す
)

# 送信をフラッシュ(ブロッキングして完了を待つ)
producer.flush()
print("メッセージ書き込み成功")

image.png

Confluent のコンソール上でメッセージを確認すると、Key が Avro スキーマをもとにエンコードされていることが確認できます。

image.png


Consumer 側での Avro デシリアライズ方法

次に、Consumer では受信した Key を Avro でデシリアライズしてみます。

from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

# 同じスキーマを使用して AvroDeserializer を作成
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_deserializer = AvroDeserializer(schema_registry_client)

consumer_conf = {
    'group.id': 'avro_consumer_group_01',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
}
consumer_conf.update(kafka_conf)

# Kafka Consumer を初期化
consumer = Consumer(consumer_conf)

# トピックを購読
consumer.subscribe([topic_name])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            # タイムアウト時やメッセージのない場合
            continue
        if msg.error():
            print(f"Consumer エラー: {msg.error()}")
            continue
        
        # Key を AvroDeserializer でデシリアライズ
        deserialized_key = avro_deserializer(
            msg.key(),
            SerializationContext(topic_name, MessageField.KEY)
        )
        
        # 値は単純な文字列として送信されているため、decode して取り出す
        deserialized_value = msg.value().decode('utf-8') if msg.value() else None
        
        print(f"キー = {deserialized_key}, 値 = {deserialized_value}")

except KeyboardInterrupt:
    print("Consumer を停止します...")
finally:
    consumer.close()

image.png

まとめ

Key のスキーマを Schema Registry で管理し、Avro を使って SerDes 化することで、下記のような運用面で多くのメリットが得られます。

  • パーティション制御 の一貫性が取りやすい
  • スキーマ管理 による Key の互換性やバージョン管理が容易になる
  • Consumer の自動デシリアライズ による開発効率の向上

Kafka におけるメッセージの Key は、ときにシステム全体のパフォーマンスや拡張性に大きく影響する要素です。スキーマ管理を含めたトピック設計を行うことで、将来的なデータモデルの変更やスケールにも対応しやすくなるため、要件に合わせて検討をおすすめします。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?