0
1

More than 3 years have passed since last update.

spring-kafkaで一度に複数メッセージをconsume

Posted at

まずBatchMessageListenerの実装を作成する。

import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.stereotype.Component;

@Component
public class SampleBatchMessageListener implements BatchMessageListener<String, String> {
    @KafkaListener(topics = "mytopic2")
    @Override
    public void onMessage(List<ConsumerRecord<String, String>> data) {
        System.out.println(data.size() + ":" + data);
    }
}

次にspring.kafka.listener.typeBATCHに変更する。

src/main/resources/application.properties
spring.kafka.bootstrap-servers=localhost:32770
spring.kafka.consumer.group-id=sample-group
spring.kafka.listener.type=BATCH

これでspring-bootを起動して適当にpublishする。

5:[ConsumerRecord(topic = mytopic2, partition = 0, leaderEpoch = 0, offs(以下省略)
1:[ConsumerRecord(topic = mytopic2, partition = 0, leaderEpoch = 0, offset (以下省略)
3:[ConsumerRecord(topic = mytopic2, partition = 0, leaderEpoch = 0, o(以下省略)

一度にバラバラの件数でconsumeされているのが分かる。

以上で動作はするが、実運用では以下らへんのパラメータも指定しないとダメと思われる。バラバラの件数でメッセージが到着しているのを見ると、いわゆるRDBMSのコミット間隔とまったく同じように使えるというわけでもない。

spring.kafka.consumer.fetch-max-wait
spring.kafka.consumer.fetch-min-size
spring.kafka.consumer.max-poll-records

バッチ処理的な発想はkafkaでも有効だが、kafka固有の事情は考慮が必要と思われる。

ドキュメント https://docs.spring.io/spring-kafka/docs/current/reference/html/#message-listeners

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1