Spring BootでKafkaのConsumerを実装するためのサンプルコードを以下に示します。このサンプルでは、Kafkaトピックからメッセージを消費する方法を説明します。
1. プロジェクトのセットアップ
まず、Spring Bootプロジェクトを作成します。Spring Initializrを使用して、以下の依存関係を追加します。
- Spring Web
- Spring for Apache Kafka
2. build.gradle または pom.xml の設定
build.gradle を使用する場合:
plugins {
id 'org.springframework.boot' version '3.1.0'
id 'io.spring.dependency-management' version '1.0.15.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
test {
useJUnitPlatform()
}
pom.xml を使用する場合:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-consumer</name>
<description>Demo project for Spring Boot and Kafka</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3. Kafkaの設定
application.yml または application.properties にKafkaの設定を追加します。
application.yml の場合:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my_group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
application.properties の場合:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my_group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
4. Kafka Consumerの実装
Kafkaトピックからメッセージを消費するためのリスナークラスを作成します。
package com.example.kafkaconsumer.listener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerListener {
@KafkaListener(topics = "my_topic", groupId = "my_group")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
-
アプリケーションの起動
Spring Bootアプリケーションを起動します。Kafkaトピックにメッセージが送信されると、コンソールにメッセージが表示されます。 -
メッセージの送信
Producerを使用して、Kafkaトピックにメッセージを送信します。例えば、以下のURLにアクセスしてメッセージを送信します。
http://localhost:8080/send?message=HelloKafka
これで、Kafkaトピックからメッセージを消費する基本的なConsumerの実装が完了です。Kafkaブローカーがローカルで実行されていることを確認してください。