0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

manabianAdvent Calendar 2024

Day 14

Confluentで理解する:コンシューマーグループとオフセット管理の基礎

Posted at

概要

Confluent プラットフォーム(Apache Kafka ベース)では、メッセージの消費(Consumer)時に「コンシューマーグループ(Consumer Group)」という重要な概念が利用されます。コンシューマーグループ単位で「オフセット(Offset)」と呼ばれる処理位置が管理・保持されることで、効率的かつ耐障害性に優れたメッセージ処理が可能となっています。

本記事では、Confluent 上でコンシューマーグループを活用する際のオフセット管理方法について、基本概念から実際の確認方法までを解説します。

コンシューマーグループとオフセットの基本概念

コンシューマーグループとは

コンシューマーグループ (Consumer Group) とは、Kafka クラスター上の特定のトピックからメッセージを消費する一つ以上のコンシューマーの集合体です。
たとえば、consumer_group_A というグループ名で複数のコンシューマープロセスを稼働させれば、同一トピックから効率的にメッセージを並行処理できます。Kafka はグループ内のコンシューマー間でパーティションを自動的に割り当て、重複なくメッセージを分散して取得・処理する仕組みを提供します。

オフセットとは

オフセット (Offset) は、トピックの各パーティションにおいてメッセージが格納される際の連番インデックスです。コンシューマーがあるオフセットまで処理したことを記録することで、システム障害時やコンシューマー再起動時にも、正確な位置から処理を再開できます。

オフセット管理の実態

コンシューマーがメッセージ処理を完了した際、「オフセットコミット(Offset Commit)」を行います。このコミット情報は、内部の特殊なトピック(__consumer_offsets)に格納され、コンシューマーグループごとに管理・保持されます。こうした仕組みにより、Kafka はスケールアウトや障害復旧において柔軟かつ堅牢なメッセージ処理が実現できるのです。

Confluent 上での動作確認

事前準備

1. テスト用トピックの作成

image.png

2. トピックへのデータ書き込み

Datagen Source Connector を用いて、テスト用トピックにデータを書き込みます。

image.png

3. Google Colab 上での環境準備

!pip install confluent-kafka confluent-kafka[avro,json,protobuf] -q

image.png

4. Confluent クラスターと Schema Registry の認証情報設定

# Kafka の接続情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "Z3KRCXUFVCCFVPEF"
sasl_password = "kHz4/I/A65/FcfmnVw0L5of8YAahK5z02t31GZSnoMcGZvdE+R9NhvLOX74QUhR6"
kafka_conf = {
    'bootstrap.servers': bootstrap_servers,
    'sasl.username':     sasl_username,
    'sasl.password':     sasl_password,
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
}

# Schema Registry 接続情報を設定
sr_url = "https://psrc-l6oz3.us-east-2.aws.confluent.cloud"
sr_api_key = "QF5SEOHC5ORP32SZ"
sr_api_secret = "xjtLFGVkKdT+Vvj0iwpvuU+maR1ptFZwAi6kyEtavdb3P7rdvLlyCTJxlUPVNfGi"
sr_conf = {
    'url': sr_url,
    'basic.auth.user.info': f"{sr_api_key}:{sr_api_secret}"
}

image.png

5. Consume の実行

topic_name = "sample_data_orders_01"
consumer_group_name = "consumer_group_01"

from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

schema_registry_client = SchemaRegistryClient(sr_conf)

consumer_conf = {
    'group.id': consumer_group_name,
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True,
}
consumer_conf.update(kafka_conf)

consumer = Consumer(consumer_conf)
consumer.subscribe([topic_name])
avro_deserializer = AvroDeserializer(schema_registry_client)
cnt = 0
loop_num = 50
try:
    for _ in range(loop_num):
        msg = consumer.poll(3.0)
        if msg is None:
            break
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        data = avro_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))
        cnt += 1
finally:
    print(f"{cnt} 回の Avro メッセージ受信終了")

image.png

パーティションごとのオフセット確認

1. Consumer.position() を利用する方法

tps = consumer.assignment()
positions = consumer.position(tps)
for tp, pos in zip(tps, positions):
    print(f"Topic: {tp.topic}, Partition: {tp.partition}, Position: {pos.offset}")
Topic: sample_data_orders_01, Partition: 0, Position: -1001
Topic: sample_data_orders_01, Partition: 1, Position: 32
Topic: sample_data_orders_01, Partition: 2, Position: 18

image.png

2. Consumer.committed() を利用する方法

tps = consumer.assignment()
committed_offsets = consumer.committed(tps)
for tp in committed_offsets:
    print(f"Topic: {tp.topic}, Partition: {tp.partition}, Committed offset: {tp.offset}")
Topic: sample_data_orders_01, Partition: 0, Committed offset: -1001
Topic: sample_data_orders_01, Partition: 1, Committed offset: 32
Topic: sample_data_orders_01, Partition: 2, Committed offset: 18

image.png

3. AdminClient によるコンシューマーグループ単位でのオフセット取得

from confluent_kafka.admin import AdminClient
from confluent_kafka import TopicPartition, ConsumerGroupTopicPartitions, KafkaException

admin_client = AdminClient(kafka_conf)

# コンシューマーグループ一覧取得
list_groups_f = admin_client.list_consumer_groups()
groups_result = list_groups_f.result(timeout=10)
valid_groups = groups_result.valid

for group_listing in valid_groups:
    group_id = group_listing.group_id
    cg_offsets_futures = admin_client.list_consumer_group_offsets([ConsumerGroupTopicPartitions(group_id=group_id)])
    future = cg_offsets_futures[group_id]

    try:
        cg_topic_partitions = future.result(timeout=10)
        print(f"Consumer Group: {group_id}")
        for tp in cg_topic_partitions.topic_partitions:
            print(f"  Topic: {tp.topic}, Partition: {tp.partition}, Offset: {tp.offset}")
    except KafkaException as e:
        print(f"Failed to retrieve offsets for group {group_id}: {e}")
Consumer Group: consumer_group_05
  Topic: sample_data_orders_2, Partition: 1, Offset: 36
  Topic: sample_data_orders_2, Partition: 0, Offset: 14
Consumer Group: consumer_group_03
  Topic: sample_data_orders_1, Partition: 2, Offset: 10
Consumer Group: consumer_group_01
  Topic: sample_data_orders_1, Partition: 0, Offset: 86
  Topic: sample_data_orders_1, Partition: 4, Offset: 62
  ...

image.png


まとめ

本記事では、Confluent プラットフォーム上でコンシューマーグループ単位のオフセット管理について、基本的な概念から実際のオフセット確認方法までを示しました。
Kafka のメッセージ処理においてオフセット管理は欠かせない存在であり、これを理解することで、障害復旧やスケーリング時にもスムーズな処理再開を行うことが可能になります。ぜひ本記事を参考に、運用・開発環境でのデータ処理をより堅牢かつ効率的に行ってみてください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?