11
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

MicroAd (マイクロアド)Advent Calendar 2019

Day 15

kafka-pythonでconsumerのオフセットを他にコピーする

Posted at

Kafka consumerを実装するときはJavaやScalaなどの言語を使うことが多いのですが、そこまで複雑でなければ軽く実装できるPythonを使いたいですね。
今回はPython向けのApache Kafkaライブラリのkafka-pythonを使って、あるconsumerのオフセットを他のconsumerに移すメンテナンス処理を書いて見たいと思います。

コンシューマーのオフセット取得

今回はKafkaAdminClientを使用します。KafkaConsumerから取得することもできるのですが、consumerを購読したりpollしたりと煩雑なのでこちらのクライアントを使うのがおすすめです。

from kafka import KafkaAdminClient

# consumer01はtopic01を購読中という想定
target_consumer_name = "consumer01"

# KafkaAdminClientを取得
cluster_admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

# オフセットを取得
offsets = cluster_admin.list_consumer_group_offsets(target_consumer_name)

参考までにこんな感じのものが返却されてきます。
TopicPartitionクラスをキーとしたオフセットの情報です。

{TopicPartition(topic='topic01', partition=0): OffsetAndMetadata(offset=14475, metadata='aa6c00e6-ffbf-41a3-b011-6997549f6166a'),
 TopicPartition(topic='topic01', partition=1): OffsetAndMetadata(offset=14494, metadata='8fcc736c-1cb0-41b5-b111-6d55d67b3096a')}

別のコンシューマーにオフセットを書き込み

consumerの書き変えを行うにはKafkaConsumerを使用します。
先程取得したオフセット情報をconsumer02に書き込みます。
consumerが存在ない場合は自動で作成が行われます。

from kafka import KafkaConsumer

consumer_group_name = 'consumer02'

consumer = KafkaConsumer(
        group_id=consumer_group_name,
        bootstrap_servers=bootstrap_servers,
        enable_auto_commit=False)

# offsetの情報を書き込み(consumerが存在しない場合作成される)
consumer.commit(offsets)

これでconsumer02にはconsumer01と同じtopicのオフセット情報が書き込まれました。
consumer02を使用して購読を開始すると、オフセット取得時のconsumer01と同じオフセットから購読を開始するはずです。

購読の例

consumer_group_name='consumer02'

# consumerを取得
consumer = KafkaConsumer(
        group_id=consumer_group_name,
        bootstrap_servers=bootstrap_servers)

# topic01を購読する設定に変更
consumer.subscribe(topics=['topic01'])

# topic01 consumer02に実際に参加
consumer.poll()

# 受信できたメッセージをprintし続ける
for msg in consumer:
    print(msg)

参考

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html

11
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?