Help us understand the problem. What is going on with this article?

kafka-pythonでconsumerのオフセットを他にコピーする

Kafka consumerを実装するときはJavaやScalaなどの言語を使うことが多いのですが、そこまで複雑でなければ軽く実装できるPythonを使いたいですね。
今回はPython向けのApache Kafkaライブラリのkafka-pythonを使って、あるconsumerのオフセットを他のconsumerに移すメンテナンス処理を書いて見たいと思います。

コンシューマーのオフセット取得

今回はKafkaAdminClientを使用します。KafkaConsumerから取得することもできるのですが、consumerを購読したりpollしたりと煩雑なのでこちらのクライアントを使うのがおすすめです。

from kafka import KafkaAdminClient

# consumer01はtopic01を購読中という想定
target_consumer_name = "consumer01"

# KafkaAdminClientを取得
cluster_admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

# オフセットを取得
offsets = cluster_admin.list_consumer_group_offsets(target_consumer_name)

参考までにこんな感じのものが返却されてきます。
TopicPartitionクラスをキーとしたオフセットの情報です。

{TopicPartition(topic='topic01', partition=0): OffsetAndMetadata(offset=14475, metadata='aa6c00e6-ffbf-41a3-b011-6997549f6166a'),
 TopicPartition(topic='topic01', partition=1): OffsetAndMetadata(offset=14494, metadata='8fcc736c-1cb0-41b5-b111-6d55d67b3096a')}

別のコンシューマーにオフセットを書き込み

consumerの書き変えを行うにはKafkaConsumerを使用します。
先程取得したオフセット情報をconsumer02に書き込みます。
consumerが存在ない場合は自動で作成が行われます。

from kafka import KafkaConsumer

consumer_group_name = 'consumer02'

consumer = KafkaConsumer(
        group_id=consumer_group_name,
        bootstrap_servers=bootstrap_servers,
        enable_auto_commit=False)

# offsetの情報を書き込み(consumerが存在しない場合作成される)
consumer.commit(offsets)

これでconsumer02にはconsumer01と同じtopicのオフセット情報が書き込まれました。
consumer02を使用して購読を開始すると、オフセット取得時のconsumer01と同じオフセットから購読を開始するはずです。

購読の例

consumer_group_name='consumer02'

# consumerを取得
consumer = KafkaConsumer(
        group_id=consumer_group_name,
        bootstrap_servers=bootstrap_servers)

# topic01を購読する設定に変更
consumer.subscribe(topics=['topic01'])

# topic01 consumer02に実際に参加
consumer.poll()

# 受信できたメッセージをprintし続ける
for msg in consumer:
    print(msg)

参考

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした