0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

manabianAdvent Calendar 2024

Day 12

Conflunt Cloud にて Kafka メッセージを圧縮して高スループットを実現する方法

Last updated at Posted at 2024-12-22

概要

Apache Kafka のスループットを最適化する方法として、メッセージ圧縮は非常に有効です。本記事では、Confluent Cloud 環境におけるメッセージ圧縮の概要と、Python を用いた実践例をご紹介します。

本記事は、以下のシリーズの一部です。

image.png

引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita

メッセージの圧縮について

Confluent のドキュメントによると、Kafka のスループットを高める手段のひとつに「メッセージの圧縮」が挙げられています。メッセージを圧縮することでネットワーク転送量が削減され、ディスクに保存されるデータサイズも小さくなるため、Kafka ブローカーやストレージへの負荷を軽減できます。

image.png

引用元:Optimize Confluent Cloud Clients for Throughput | Confluent Documentation

Kafka では、以下の 5 種類の圧縮形式がサポートされています。中でも一般的には lz4 がパフォーマンス面で選ばれることが多いようです。

image.png

引用元:Kafka producer configuration reference | Confluent Documentation

OSS 版 Kafka(Confluent Platform を含む)では、compression.type を Topic 単位で設定できますが、Confluent Cloud では Producer 側でのみ圧縮を有効にできます。GUI から compression.type の値を変更しても反映されず、CUI でも同様です。

Unlike Confluent Platform, compression.type is not configurable on Confluent Cloud topics.
引用元:Optimize Confluent Cloud Clients for Throughput | Confluent Documentation

詳細については Confluent 社の公式ブログもご参照ください。

image.png

引用元:Apache Kafka Message Compression

メッセージの圧縮の実践

事前準備

ライブラリのインストール

!pip install confluent-kafka confluent-kafka[avro] -q

image.png

Topic を作成

Confluent Cloud の管理画面で適宜 Topic を作成してください。

image.png

Confluent の接続情報をセット

# Kafka の接続情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "OWOG4SB2WGHF2A2V"
sasl_password = "w4LTJunhjUfGCd7kyRaxp42HjzbNVK1Zr68Q8SNg8J2BDM7fTasaGQ2WM1OE6BnJ"

kafka_conf = {
    "bootstrap.servers": bootstrap_servers,
    "sasl.username":     sasl_username,
    "sasl.password":     sasl_password,
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms":   "PLAIN",
}

# Schema Registry の接続情報
sr_url = "https://psrc-lq2dm.us-east-2.aws.confluent.cloud"
sr_api_key = "WW3RYJC3LRGQUJ5K"
sr_api_secret = "Z4lNxYTZ3aZwHo1sdJtmzG9xSvSzz97ztGocYE4hlzaRRb56vuLekEacKXeJBB/n"
sr_conf = {
    "url": sr_url,
    "basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}"
}

image.png

Produce の実施

Producer でメッセージを圧縮するには、compression.type オプションを設定します。以下は Avro シリアライズを併用したサンプルコードです。

# スキーマを定義
topic_name_1 = "compression_topic_1"
avro_schema_str = """
{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_number", "type": "int"},
    {"name": "favorite_color", "type": "string"}
  ]
}
"""

# サンプルレコード
record = {
    "name": "alice",
    "favorite_number": 42,
    "favorite_color": "blue"
}

from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

# SchemaRegistryClient と AvroSerializer を初期化
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_serializer = AvroSerializer(
    schema_registry_client,
    avro_schema_str,
)

# Producer 用の Config を定義
producer_conf = {
    'compression.type': 'lz4',
}
producer_conf.update(kafka_conf)


# Kafka Producer を初期化
producer = Producer(producer_conf)

# Avro でシリアライズ
serialized_value = avro_serializer(
    record,
    SerializationContext(topic_name_1, MessageField.VALUE)
)

# Kafka にメッセージを送信
producer.produce(
    topic=topic_name_1,
    key=record["name"],
    value=serialized_value,
)

# 送信をフラッシュ(ブロッキングして完了を待つ)
producer.flush()
print("メッセージ書き込み成功")

Consume の実施

Consumer では特に圧縮の設定は不要です。以下のサンプルコードは Avro を用いてメッセージをデシリアライズする例です。

from confluent_kafka import Consumer
from confluent_kafka.schema_registry.avro import AvroDeserializer

consumer_conf = {
    'group.id': 'avro_consumer_group_01',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
}
consumer_conf.update(kafka_conf)

consumer = Consumer(consumer_conf)
consumer.subscribe([topic_name_1])

avro_deserializer = AvroDeserializer(
    schema_registry_client,
    avro_schema_str,
)

try:
    while True:
        msg = consumer.poll(3.0)
        if msg is None:
            break
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        user = avro_deserializer(
            msg.value(),
            SerializationContext(msg.topic(), MessageField.VALUE)
        )
        if user:
            print(f"Consumed Avro record: key={msg.key()}, value={user}")
finally:
    consumer.close()
    print("Avroメッセージ受信終了")

image.png

まとめ

本記事では Confluent Cloud 上の Kafka でメッセージ圧縮を行う方法と、その送受信のサンプル実装の紹介をしました。圧縮を有効にすると、ネットワーク帯域やストレージ使用量の削減、さらにはブローカー負荷の軽減につながるため、ぜひ活用を検討してみてください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?