0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Spring Boot + Kafka + Kafka UIローカル環境準備(docker-compose)

Posted at

Kafka使ってstreamアプリケーション作るためのローカル環境準備

を使いつつ、producerとconsumerをSpringBootで準備。疎通用。

docker-compose

services:
  kafka:
    image: apache/kafka:latest
    container_name: kafka
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 3

  producer:
    build:
      context: ./producer
    container_name: kafka-producer
    depends_on:
      - kafka
    environment:
      SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092

  consumer:
    build:
      context: ./consumer
    container_name: kafka-consumer
    depends_on:
      - kafka
    environment:
      SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local-kafka
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_METRICS_PORT: 8080
    depends_on:
      - kafka

ディレクトリ構成

my-kafka-spring-app/
├── docker-compose.yml
├── producer/
│   ├── Dockerfile
│   ├── build.gradle
│   └── src/
│       └── main/
│           ├── java/
│           │   └── com/example/kafka/
│           │       ├── KafkaProducerApplication.java
│           │       └── controller/
│           │           └── MessageController.java
│           └── resources/
│               └── application.yml
├── consumer/
│   ├── Dockerfile
│   ├── build.gradle
│   └── src/
│       └── main/
│           ├── java/
│           │   └── com/example/kafka/
│           │       ├── KafkaConsumerApplication.java
│           │       └── listener/
│           │           └── MessageListener.java
│           └── resources/
│               └── application.yml

producer

ディレクトリ構成

producer/
├── build.gradle
├── Dockerfile
└── src/
    └── main/
        ├── java/
        │   └── com/example/kafka/
        │       ├── KafkaProducerApplication.java
        │       └── controller/
        │           └── MessageController.java
        └── resources/
            └── application.yml

build.gradle

plugins {
    id 'org.springframework.boot' version '3.1.5'
    id 'io.spring.dependency-management' version '1.1.3'
    id 'java'
}

group = 'com.example.kafka'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.kafka:spring-kafka'
}

Dockerfile

FROM gradle:8.4-jdk17 AS builder
WORKDIR /app
COPY . /app
RUN gradle bootJar --no-daemon

FROM eclipse-temurin:17-jre
WORKDIR /app
COPY --from=builder /app/build/libs/*.jar app.jar
ENTRYPOINT ["java", "-jar", "app.jar"]

KafkaProducerApplication.java

package com.example.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApplication.class, args);
    }
}

MessageController.java

package com.example.kafka.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/messages")
public class MessageController {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.topic:test-topic}")
    private String topic;

    public MessageController(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping
    public String sendMessage(@RequestBody String message) {
        kafkaTemplate.send(topic, message);
        return "Message sent to Kafka: " + message;
    }
}

application.yml

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS:kafka:9092}
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

kafka:
  topic: test-topic

consumer

ディレクトリ構成

consumer/
├── build.gradle
├── Dockerfile
└── src/
    └── main/
        ├── java/
        │   └── com/example/kafka/
        │       ├── KafkaConsumerApplication.java
        │       └── listener/
        │           └── MessageListener.java
        └── resources/
            └── application.yml

bulid.gradleとDockerfileはproducerと同じ内容

KafkaConsumerApplication.java

package com.example.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }
}

MessageListener.java

package com.example.kafka.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MessageListener {

    @KafkaListener(topics = "${kafka.topic:test-topic}", groupId = "test-group")
    public void listen(String message) {
        System.out.println("Received message from Kafka: " + message);
    }
}

application.yml

spring:
  kafka:
    bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS:kafka:9092}
    consumer:
      group-id: test-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

kafka:
  topic: test-topic

接続確認

test-topicに対してメッセージ送信
image.png

messageがtopicにあることを確認
image.png

consumerがメッセージを処理したことをログで確認
kafka-consumer | Received message from Kafka: hello

おわり

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?