Kafka使ってstreamアプリケーション作るためのローカル環境準備
を使いつつ、producerとconsumerをSpringBootで準備。疎通用。
docker-compose
services:
kafka:
image: apache/kafka:latest
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
producer:
build:
context: ./producer
container_name: kafka-producer
depends_on:
- kafka
environment:
SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
consumer:
build:
context: ./consumer
container_name: kafka-consumer
depends_on:
- kafka
environment:
SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local-kafka
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_METRICS_PORT: 8080
depends_on:
- kafka
ディレクトリ構成
my-kafka-spring-app/
├── docker-compose.yml
├── producer/
│ ├── Dockerfile
│ ├── build.gradle
│ └── src/
│ └── main/
│ ├── java/
│ │ └── com/example/kafka/
│ │ ├── KafkaProducerApplication.java
│ │ └── controller/
│ │ └── MessageController.java
│ └── resources/
│ └── application.yml
├── consumer/
│ ├── Dockerfile
│ ├── build.gradle
│ └── src/
│ └── main/
│ ├── java/
│ │ └── com/example/kafka/
│ │ ├── KafkaConsumerApplication.java
│ │ └── listener/
│ │ └── MessageListener.java
│ └── resources/
│ └── application.yml
producer
ディレクトリ構成
producer/
├── build.gradle
├── Dockerfile
└── src/
└── main/
├── java/
│ └── com/example/kafka/
│ ├── KafkaProducerApplication.java
│ └── controller/
│ └── MessageController.java
└── resources/
└── application.yml
build.gradle
plugins {
id 'org.springframework.boot' version '3.1.5'
id 'io.spring.dependency-management' version '1.1.3'
id 'java'
}
group = 'com.example.kafka'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
}
Dockerfile
FROM gradle:8.4-jdk17 AS builder
WORKDIR /app
COPY . /app
RUN gradle bootJar --no-daemon
FROM eclipse-temurin:17-jre
WORKDIR /app
COPY --from=builder /app/build/libs/*.jar app.jar
ENTRYPOINT ["java", "-jar", "app.jar"]
KafkaProducerApplication.java
package com.example.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaProducerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
}
MessageController.java
package com.example.kafka.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/messages")
public class MessageController {
private final KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.topic:test-topic}")
private String topic;
public MessageController(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostMapping
public String sendMessage(@RequestBody String message) {
kafkaTemplate.send(topic, message);
return "Message sent to Kafka: " + message;
}
}
application.yml
server:
port: 8080
spring:
kafka:
bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS:kafka:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
kafka:
topic: test-topic
consumer
ディレクトリ構成
consumer/
├── build.gradle
├── Dockerfile
└── src/
└── main/
├── java/
│ └── com/example/kafka/
│ ├── KafkaConsumerApplication.java
│ └── listener/
│ └── MessageListener.java
└── resources/
└── application.yml
bulid.gradleとDockerfileはproducerと同じ内容
KafkaConsumerApplication.java
package com.example.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
}
MessageListener.java
package com.example.kafka.listener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MessageListener {
@KafkaListener(topics = "${kafka.topic:test-topic}", groupId = "test-group")
public void listen(String message) {
System.out.println("Received message from Kafka: " + message);
}
}
application.yml
spring:
kafka:
bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS:kafka:9092}
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
topic: test-topic
接続確認
consumerがメッセージを処理したことをログで確認
kafka-consumer | Received message from Kafka: hello
おわり

