概要
Apache Kafka のスループットを最適化する方法として、メッセージ圧縮は非常に有効です。本記事では、Confluent Cloud 環境におけるメッセージ圧縮の概要と、Python を用いた実践例をご紹介します。
本記事は、以下のシリーズの一部です。
引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita
メッセージの圧縮について
Confluent のドキュメントによると、Kafka のスループットを高める手段のひとつに「メッセージの圧縮」が挙げられています。メッセージを圧縮することでネットワーク転送量が削減され、ディスクに保存されるデータサイズも小さくなるため、Kafka ブローカーやストレージへの負荷を軽減できます。
引用元:Optimize Confluent Cloud Clients for Throughput | Confluent Documentation
Kafka では、以下の 5 種類の圧縮形式がサポートされています。中でも一般的には lz4
がパフォーマンス面で選ばれることが多いようです。
引用元:Kafka producer configuration reference | Confluent Documentation
OSS 版 Kafka(Confluent Platform を含む)では、compression.type
を Topic 単位で設定できますが、Confluent Cloud では Producer 側でのみ圧縮を有効にできます。GUI から compression.type
の値を変更しても反映されず、CUI でも同様です。
Unlike Confluent Platform,
compression.type
is not configurable on Confluent Cloud topics.
引用元:Optimize Confluent Cloud Clients for Throughput | Confluent Documentation
詳細については Confluent 社の公式ブログもご参照ください。
メッセージの圧縮の実践
事前準備
ライブラリのインストール
!pip install confluent-kafka confluent-kafka[avro] -q
Topic を作成
Confluent Cloud の管理画面で適宜 Topic を作成してください。
Confluent の接続情報をセット
# Kafka の接続情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "OWOG4SB2WGHF2A2V"
sasl_password = "w4LTJunhjUfGCd7kyRaxp42HjzbNVK1Zr68Q8SNg8J2BDM7fTasaGQ2WM1OE6BnJ"
kafka_conf = {
"bootstrap.servers": bootstrap_servers,
"sasl.username": sasl_username,
"sasl.password": sasl_password,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
}
# Schema Registry の接続情報
sr_url = "https://psrc-lq2dm.us-east-2.aws.confluent.cloud"
sr_api_key = "WW3RYJC3LRGQUJ5K"
sr_api_secret = "Z4lNxYTZ3aZwHo1sdJtmzG9xSvSzz97ztGocYE4hlzaRRb56vuLekEacKXeJBB/n"
sr_conf = {
"url": sr_url,
"basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}"
}
Produce の実施
Producer でメッセージを圧縮するには、compression.type
オプションを設定します。以下は Avro シリアライズを併用したサンプルコードです。
# スキーマを定義
topic_name_1 = "compression_topic_1"
avro_schema_str = """
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string"}
]
}
"""
# サンプルレコード
record = {
"name": "alice",
"favorite_number": 42,
"favorite_color": "blue"
}
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
# SchemaRegistryClient と AvroSerializer を初期化
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_serializer = AvroSerializer(
schema_registry_client,
avro_schema_str,
)
# Producer 用の Config を定義
producer_conf = {
'compression.type': 'lz4',
}
producer_conf.update(kafka_conf)
# Kafka Producer を初期化
producer = Producer(producer_conf)
# Avro でシリアライズ
serialized_value = avro_serializer(
record,
SerializationContext(topic_name_1, MessageField.VALUE)
)
# Kafka にメッセージを送信
producer.produce(
topic=topic_name_1,
key=record["name"],
value=serialized_value,
)
# 送信をフラッシュ(ブロッキングして完了を待つ)
producer.flush()
print("メッセージ書き込み成功")
Consume の実施
Consumer では特に圧縮の設定は不要です。以下のサンプルコードは Avro を用いてメッセージをデシリアライズする例です。
from confluent_kafka import Consumer
from confluent_kafka.schema_registry.avro import AvroDeserializer
consumer_conf = {
'group.id': 'avro_consumer_group_01',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
consumer_conf.update(kafka_conf)
consumer = Consumer(consumer_conf)
consumer.subscribe([topic_name_1])
avro_deserializer = AvroDeserializer(
schema_registry_client,
avro_schema_str,
)
try:
while True:
msg = consumer.poll(3.0)
if msg is None:
break
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
user = avro_deserializer(
msg.value(),
SerializationContext(msg.topic(), MessageField.VALUE)
)
if user:
print(f"Consumed Avro record: key={msg.key()}, value={user}")
finally:
consumer.close()
print("Avroメッセージ受信終了")
まとめ
本記事では Confluent Cloud 上の Kafka でメッセージ圧縮を行う方法と、その送受信のサンプル実装の紹介をしました。圧縮を有効にすると、ネットワーク帯域やストレージ使用量の削減、さらにはブローカー負荷の軽減につながるため、ぜひ活用を検討してみてください。