Kafka consumerを実装するときはJavaやScalaなどの言語を使うことが多いのですが、そこまで複雑でなければ軽く実装できるPythonを使いたいですね。
今回はPython向けのApache Kafkaライブラリのkafka-pythonを使って、あるconsumerのオフセットを他のconsumerに移すメンテナンス処理を書いて見たいと思います。
コンシューマーのオフセット取得
今回はKafkaAdminClientを使用します。KafkaConsumerから取得することもできるのですが、consumerを購読したりpollしたりと煩雑なのでこちらのクライアントを使うのがおすすめです。
from kafka import KafkaAdminClient
# consumer01はtopic01を購読中という想定
target_consumer_name = "consumer01"
# KafkaAdminClientを取得
cluster_admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
# オフセットを取得
offsets = cluster_admin.list_consumer_group_offsets(target_consumer_name)
参考までにこんな感じのものが返却されてきます。
TopicPartitionクラスをキーとしたオフセットの情報です。
{TopicPartition(topic='topic01', partition=0): OffsetAndMetadata(offset=14475, metadata='aa6c00e6-ffbf-41a3-b011-6997549f6166a'),
TopicPartition(topic='topic01', partition=1): OffsetAndMetadata(offset=14494, metadata='8fcc736c-1cb0-41b5-b111-6d55d67b3096a')}
別のコンシューマーにオフセットを書き込み
consumerの書き変えを行うにはKafkaConsumerを使用します。
先程取得したオフセット情報をconsumer02に書き込みます。
consumerが存在ない場合は自動で作成が行われます。
from kafka import KafkaConsumer
consumer_group_name = 'consumer02'
consumer = KafkaConsumer(
group_id=consumer_group_name,
bootstrap_servers=bootstrap_servers,
enable_auto_commit=False)
# offsetの情報を書き込み(consumerが存在しない場合作成される)
consumer.commit(offsets)
これでconsumer02にはconsumer01と同じtopicのオフセット情報が書き込まれました。
consumer02を使用して購読を開始すると、オフセット取得時のconsumer01と同じオフセットから購読を開始するはずです。
購読の例
consumer_group_name='consumer02'
# consumerを取得
consumer = KafkaConsumer(
group_id=consumer_group_name,
bootstrap_servers=bootstrap_servers)
# topic01を購読する設定に変更
consumer.subscribe(topics=['topic01'])
# topic01 consumer02に実際に参加
consumer.poll()
# 受信できたメッセージをprintし続ける
for msg in consumer:
print(msg)
参考
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html