概要
Confluent 上で Key をシリアライズ/デシリアライズ(SerDes)する方法を検証してみました。Apache Kafka ではメッセージの Key を単純に文字列として運用するケースもありますが、Key をシリイズし、Schema Registry で管理すると以下のような多くのメリットがあります。
- 複数列に分割した Key を使用する場合でも、パーティション制御の一貫性を維持できる
- Key のデータ構造をスキーマとして管理し、互換性やバージョン管理を実施できる
- Consumer 側で自動デシリアライズできるため、実装が簡単になる
今回は Key を Avro スキーマで管理し、Ser/Des を行う手順を紹介します。
本記事は、以下のシリーズの一部です。
引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita
事前準備
Confluent と Schema Registry への接続情報をセット
# Kafka の接続情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "B5NJBZ22ZKEJOG22"
sasl_password = "7IMvcMC52lAiGPixGB3AyekX0Tk2nNXqIzufYRoxIwdVw8nJAOFAjngKMz2W4Cg0"
kafka_conf = {
"bootstrap.servers": bootstrap_servers,
"sasl.username": sasl_username,
"sasl.password": sasl_password,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
}
# Schema Registry の接続情報
sr_url = "https://psrc-lq2dm.us-east-2.aws.confluent.cloud"
sr_api_key = "3CEWV5JNSV6IEKIV"
sr_api_secret = "Eo+9lTX117MrFtS6n09tpdpK7HcXZ6aXoF2d+mNnJYE/4Xtgiyy5ybennM5dCNdP"
sr_conf = {
"url": sr_url,
"basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}"
}
検証に利用する Topic を作成
事前準備として、Confluent の AdminClient を利用して複数のトピックを作成しておきます。以下のコードでは、存在しないトピックは新規に作成し、既に存在していればエラーを出力する実装です。
from confluent_kafka.admin import AdminClient, NewTopic
def create_topics_example(kafka_conf, topics_to_create):
# AdminClientの設定
admin_client = AdminClient(kafka_conf)
# トピック作成リクエスト(NewTopic オブジェクト)のリストを作る
new_topics = []
for topic_name in topics_to_create:
# 必要に応じて num_partitions や replication_factor を設定
new_topic = NewTopic(topic=topic_name)
new_topics.append(new_topic)
# AdminClient の create_topics を呼び出すと、戻り値は {topic名: Future} の辞書
futures = admin_client.create_topics(new_topics)
# Future の結果をチェックして、作成成功 / 失敗を判定
for topic, f in futures.items():
try:
f.result() # 例外が起きなければ成功
print(f"Topic '{topic}' created successfully.")
except Exception as e:
print(f"Failed to create topic '{topic}': {e}")
topics_to_create = [
"json_schema_01",
"json_schema_02",
"json_schema_03",
"json_schema_04",
"json_schema_05",
"json_schema_06",
"json_schema_07",
"json_schema_08",
]
create_topics_example(
kafka_conf,
topics_to_create,
)
Key に対する Ser/Des の実施
Consumer 側での Avro デシリアライズ方法
以下では、Key 部分を Avro でシリアライズして送信します。
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
# SchemaRegistryClient と AvroSerializer を初期化
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_serializer = AvroSerializer(
schema_registry_client,
key_schema_str,
)
# Kafka Producer を初期化
producer = Producer(kafka_conf)
# Avro でシリアライズ
serialized_key_value = avro_serializer(
customer_key_data,
SerializationContext(topic_name, MessageField.KEY)
)
# Kafka にメッセージを送信
producer.produce(
topic=topic_name,
key=serialized_key_value,
value=customer_value_data,
)
# 送信をフラッシュ(ブロッキングして完了を待つ)
producer.flush()
print("メッセージ書き込み成功")
Confluent 上でデータを確認すると下記のようになっていました。
Consumer 側での Avro デシリアライズ方法
次に、Consumer では受信した Key を Avro でデシリアライズしてみます。
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
# ここでも同じスキーマを使って AvroDeserializer を作成
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_deserializer = AvroDeserializer(schema_registry_client)
consumer_conf = {
'group.id': 'avro_consumer_group_01',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
consumer_conf.update(kafka_conf)
# Kafka Consumer の設定
consumer = Consumer(consumer_conf)
# トピックを購読
consumer.subscribe([topic_name])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
# タイムアウト時やメッセージなしの場合
continue
if msg.error():
print(f"Consumer エラー: {msg.error()}")
continue
# --- キーのデシリアライズ部分 ---
# ここで msg.key() はバイナリとして受け取っているので、AvroDeserializer で復元
deserialized_key = avro_deserializer(
msg.key(),
SerializationContext(topic_name, MessageField.KEY)
)
# 値は今回は単純な文字列(未シリアライズ)なので、decode するなどで取り出す
# (Producerでそのまま string を渡している場合)
deserialized_value = msg.value().decode('utf-8') if msg.value() else None
print(f"キー = {deserialized_key}, 値 = {deserialized_value}")
except KeyboardInterrupt:
print("Consumer を停止します...")
finally:
consumer.close()
以下に、「まとめ」の章を追記した、全体の文章を改善したバージョンを提示します。
概要
Confluent 上で Key をシリアライズ/デシリアライズ(SerDes)する方法を検証してみました。Apache Kafka ではメッセージの Key を単純に文字列として運用するケースもありますが、Key をシリアライズし、Schema Registry で管理すると以下のような多くのメリットがあります。
- 複数列に分割した Key を使用する場合でも、パーティション制御の一貫性を維持できる
- Key のデータ構造をスキーマとして管理し、互換性やバージョン管理を実施できる
- Consumer 側で自動デシリアライズできるため、実装が簡単になる
今回は Key を Avro スキーマで管理し、SerDes を行う手順を紹介します。
事前準備
Confluent と Schema Registry への接続情報をセット
# Kafka の接続情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "B5NJBZ22ZKEJOG22"
sasl_password = "7IMvcMC52lAiGPixGB3AyekX0Tk2nNXqIzufYRoxIwdVw8nJAOFAjngKMz2W4Cg0"
kafka_conf = {
"bootstrap.servers": bootstrap_servers,
"sasl.username": sasl_username,
"sasl.password": sasl_password,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
}
# Schema Registry の接続情報
sr_url = "https://psrc-lq2dm.us-east-2.aws.confluent.cloud"
sr_api_key = "3CEWV5JNSV6IEKIV"
sr_api_secret = "Eo+9lTX117MrFtS6n09tpdpK7HcXZ6aXoF2d+mNnJYE/4Xtgiyy5ybennM5dCNdP"
sr_conf = {
"url": sr_url,
"basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}"
}
検証用トピックの作成
事前準備として、Confluent の AdminClient を利用して複数のトピックを作成しておきます。以下のコードでは、存在しないトピックは新規に作成し、既に存在していればエラーを出力する実装です。
from confluent_kafka.admin import AdminClient, NewTopic
def create_topics_example(kafka_conf, topics_to_create):
# AdminClientの設定
admin_client = AdminClient(kafka_conf)
# トピック作成リクエスト(NewTopic オブジェクト)のリストを作成
new_topics = []
for topic_name in topics_to_create:
# 必要に応じて num_partitions や replication_factor を設定
new_topic = NewTopic(topic=topic_name)
new_topics.append(new_topic)
# AdminClient の create_topics を呼び出し、戻り値は {topic名: Future} の辞書
futures = admin_client.create_topics(new_topics)
# Future の結果をチェックして、作成成功 / 失敗を判定
for topic, f in futures.items():
try:
f.result() # 例外が起きなければ成功
print(f"Topic '{topic}' created successfully.")
except Exception as e:
print(f"Failed to create topic '{topic}': {e}")
topics_to_create = [
"json_schema_01",
"json_schema_02",
"json_schema_03",
"json_schema_04",
"json_schema_05",
"json_schema_06",
"json_schema_07",
"json_schema_08",
]
create_topics_example(
kafka_conf,
topics_to_create,
)
Key に対する Ser/Des の実施
Producer 側での Avro シリアライズ方法
以下の例では、Key 部分を Avro でシリアライズして送信します。
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
# SchemaRegistryClient と AvroSerializer を初期化
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_serializer = AvroSerializer(
schema_registry_client,
key_schema_str, # 事前に定義したキーの Avro スキーマ文字列
)
# Kafka Producer を初期化
producer = Producer(kafka_conf)
# Avro で Key をシリアライズ
serialized_key_value = avro_serializer(
customer_key_data, # 送信したい Key データ
SerializationContext(topic_name, MessageField.KEY)
)
# Kafka にメッセージを送信
producer.produce(
topic=topic_name,
key=serialized_key_value,
value=customer_value_data, # 今回は文字列などを単純に渡す
)
# 送信をフラッシュ(ブロッキングして完了を待つ)
producer.flush()
print("メッセージ書き込み成功")
Confluent のコンソール上でメッセージを確認すると、Key が Avro スキーマをもとにエンコードされていることが確認できます。
Consumer 側での Avro デシリアライズ方法
次に、Consumer では受信した Key を Avro でデシリアライズしてみます。
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
# 同じスキーマを使用して AvroDeserializer を作成
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_deserializer = AvroDeserializer(schema_registry_client)
consumer_conf = {
'group.id': 'avro_consumer_group_01',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
consumer_conf.update(kafka_conf)
# Kafka Consumer を初期化
consumer = Consumer(consumer_conf)
# トピックを購読
consumer.subscribe([topic_name])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
# タイムアウト時やメッセージのない場合
continue
if msg.error():
print(f"Consumer エラー: {msg.error()}")
continue
# Key を AvroDeserializer でデシリアライズ
deserialized_key = avro_deserializer(
msg.key(),
SerializationContext(topic_name, MessageField.KEY)
)
# 値は単純な文字列として送信されているため、decode して取り出す
deserialized_value = msg.value().decode('utf-8') if msg.value() else None
print(f"キー = {deserialized_key}, 値 = {deserialized_value}")
except KeyboardInterrupt:
print("Consumer を停止します...")
finally:
consumer.close()
まとめ
Key のスキーマを Schema Registry で管理し、Avro を使って SerDes 化することで、下記のような運用面で多くのメリットが得られます。
- パーティション制御 の一貫性が取りやすい
- スキーマ管理 による Key の互換性やバージョン管理が容易になる
- Consumer の自動デシリアライズ による開発効率の向上
Kafka におけるメッセージの Key は、ときにシステム全体のパフォーマンスや拡張性に大きく影響する要素です。スキーマ管理を含めたトピック設計を行うことで、将来的なデータモデルの変更やスケールにも対応しやすくなるため、要件に合わせて検討をおすすめします。