Why not login to Qiita and try out its useful features?

We'll deliver articles that match you.

You can read useful information later.

2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Spring Boot + spring-kafka で Kafka Consumer を作る

Posted at

Spring BootではWebアプリを作ることが多いかもしれないが、Kafkaクライアントを開発することもできる。今回はspring-kafkaを使ってConsumerを作ってみる。
全体のコードは以下にある。
https://github.com/Udomomo/spring-boot-kafka-consumer

バージョン

Spring Boot: 2.4.2
spring-kafka: 2.6.5

spring-kafkaの仕組み

spring-kafkaによるKafka Consumerの処理は、@KafkaListener アノテーションを使うことで実装することができる。これを使うためには、KafkaListenerContainerFactory を作成するBeanを @Configuration アノテーションを付与したクラスで提供し、さらにそのクラスに @EnableKafka アノテーションを付与する。これにより MessageListenerContainer がDIコンテナに登録され、Kafka Consumerが自動で設定される。

application.yaml

公式ドキュメントのチュートリアルでは、Kafka Consumerの設定情報をコードの中で直接指定することでコード量を減らしているが、今回は業務で使うアプリケーションに近くなるよう、設定情報をapplication.yamlに切り出してみる。
application.yamlには、bootstrap-servers, group-id, topic の3つを記載する。(Deserializerはアプリケーションの実装と密接に関わるため、アプリケーション内で指定する)

application.yaml
kafka:
  bootstrap-servers: http://localhost:9093
  group-id: foo
  topic: my-topic

KafkaListenerContainerFactoryの作成

application.yamlで記載した設定情報を使い、KafkaListenerContainerFactory を作成する。
まず、アプリケーションからapplication.yamlの情報を取得する。 @ConfigurationProperties とすることで、application.yaml内の設定値をPOJOにマッピングできる。また、prefix を指定することでyamlの階層構造を指定できる。
下の例では、 prefix = kafka と指定しているので、例えば bootstrapServers プロパティにapplication.yaml内の kafka.bootstrap-servers の値がマッピングされる。(大文字小文字やハイフン・アンダースコアなどの表記差は、Spring BootのRelaxed Binding機能によって良きに計らってくれる。詳しくは公式リファレンスを参照)
なお、設定値をマッピングして後で取得するにはgetter/setterメソッドが必要になる。今回はLombokの @Data アノテーションを使用してgetter/setterメソッドを作成している。

KafkaSettings.java
@ConfigurationProperties(prefix = "kafka")
@Data
public class KafkaSettings {
  private String bootstrapServers;
  private String groupId;
  private String topic;
}

取得した設定情報を用いて、KafkaConsumerを設定する。まずDefaultKafkaConsumerFactory インスタンスを作成し、それを用いてConcurrentKafkaListenerContainerFactory インスタンスを作成する。このインスタンスは ConcurrentKafkaListenerContainer を生成するファクトリであり、これによって複数のtopicやpartitionからのメッセージをマルチスレッドで処理することも可能になる。
また、@EnableConfigurationPropertiesアノテーションを使うことで、@ConfigurationProperties を付与したPOJOをBeanとして登録できる。これにより、設定値をマッピングされたKafkaSettingsインスタンスを扱えるようになり、getterメソッドを呼び出して設定値を取得できるようになる。

KafkaConfig.java
@Configuration
@EnableKafka
@EnableConfigurationProperties({KafkaSettings.class})
public class KafkaConfig {
  @Bean
  KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
  kafkaListenerContainerFactory(KafkaSettings settings) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory(settings));
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

  @Bean
  public ConsumerFactory<String, String> consumerFactory(KafkaSettings settings) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, settings.getBootstrapServers());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, settings.getGroupId());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(props);
  }
}

最後に、 @KafkaListener アノテーションを使ってConsumerのメインロジックを実装する。ここではListenするtopicの名前を指定しなければいけないが、 topics = "${kafka.topic}" のようにSpELを使うことでapplication.yamlのtopicの値を直接呼び出している。この部分では groupId を指定することもできるが、今回はgroupIdは@ConfigurationProperties を通じて既に指定したのでそちらが使われる。

KafkaConsumer.java
@Component
public class KafkaConsumer {
  @KafkaListener(topics = "${kafka.topic}")
  public void consume(ConsumerRecord<String, String> record) {
    System.out.println("key: " + record.key() + ", value: " + record.value());
  }
}

メッセージの送信

bitnami/kafkaを使ってKafka brokerを立てる。以下のようにすると、Dockerネットワーク内部からの接続は localhost:9092 で、外部からの接続は localhost:9093 で受け付けられる。

docker-compose.yaml
version: "2"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.7
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: docker.io/bitnami/kafka:2
    ports:
      - "9093:9093"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

kafka cliからtopicを作り、メッセージを送ってみる。

$ docker-compose up -d
$ docker-compose exec kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092
$ docker-compose exec kafka /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic --property "parse.key=true" --property "key.separator=,"
>hello,hi

Spring Boot側でログが出れば成功。

key: hello, value: hi
2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?