2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Axon FrameworkでKafkaにイベント送信する [Kotlin,SpringBoot]

Last updated at Posted at 2024-11-10

はじめに

Axon Framework(以降、Axonと記述)を使用してKafkaにイベントを送信する最小限の実装を試した過程を残しておきます。
Kotlinを使用し、Spring Bootアプリケーションとして構築しています。

失敗も含めて小さいステップで確認しながら進めているので、最終形だけ見たい方は、githubリポジトリをご参照ください。

実装においては、以下のリポジトリを参考にさせていただきました。

プロジェクトのセットアップ

プロジェクトの初期状態のディレクトリ・ファイル構成は以下です。

ProducerApplicationをSpring Bootアプリケーションとして起動し、動作確認用のAPIエンドポイントを1つ用意した状態です。

producer/
├── build.gradle.kts
├── gradle
│   └── wrapper
│       ├── gradle-wrapper.jar
│       └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── settings.gradle.kts
└── src
    └── main
        └── kotlin
            └── org
                └── example
                    └── axonkafkatrial
                        └── producer
                            ├── ProducerApplication.kt
                            └── controller
                                └── TrialController.kt
setting.gradle.kts
rootProject.name = "axon_kafka_trial_producer"
build.gradle.kts
plugins {
    val kotlinVersion = "2.0.0"
    kotlin("jvm") version kotlinVersion
    kotlin("plugin.spring") version kotlinVersion
    id("org.springframework.boot") version "3.3.5"
    id("io.spring.dependency-management") version "1.1.5"
}

group = "org.example"
version = "1.0-SNAPSHOT"

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
}

kotlin {
    jvmToolchain(21)
}

src/main/kotlin/org/example/axonkafkatrial/producer/ProducerApplication.kt
package org.example.axonkafkatrial.producer

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class ProducerApplication

fun main(args: Array<String>) {
    runApplication<ProducerApplication>(*args)
}
src/main/kotlin/org/example/axonkafkatrial/producer/controller/TrialController.kt
package org.example.axonkafkatrial.producer.controller

import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController

@RestController
@RequestMapping("/")
class TrialController {
    @GetMapping
    @ResponseStatus(HttpStatus.OK)
    fun get(): GetResponse {
        return GetResponse("success")
    }
}

data class GetResponse(val body: String)

APIエンドポイントの動作確認

$ curl localhost:8080
{"body":"success"}

参考までに動作確認時のローカル環境情報を記載します。

$ ./gradlew --version

------------------------------------------------------------
Gradle 8.5
------------------------------------------------------------

Build time:   2023-11-29 14:08:57 UTC
Revision:     28aca86a7180baa17117e0e5ba01d8ea9feca598

Kotlin:       1.9.20
Groovy:       3.0.17
Ant:          Apache Ant(TM) version 1.10.13 compiled on January 4 2023
JVM:          21.0.3 (Amazon.com Inc. 21.0.3+9-LTS)
OS:           Mac OS X 14.5 x86_64

いい加減、Intel Mac は卒業したい。。。

Axon の導入

依存ライブラリの追加

Spring BootでAuto Configurationができるように、 axon-spring-boot-starterを使用します。

build.gradle.kts
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.axonframework:axon-spring-boot-starter:4.10.2") // 追加
}

この状態でアプリ起動すると、AxonServerに接続しようとして、以下の例外が発生しました。

INFO 5219 --- [           main] i.a.a.c.impl.AxonServerManagedChannel    : Requesting connection details from localhost:8124
**********************************************
*                                            *
*  !!! UNABLE TO CONNECT TO AXON SERVER !!!  *
*                                            *
* Are you sure it's running?                 *
* Don't have Axon Server yet?                *
* Go to:  https://axoniq.io/go-axon          *
*                                            *
**********************************************

To suppress this message, you can
 - explicitly configure an AxonServer location,
 - start with -Daxon.axonserver.suppressDownloadMessage=true

WARN 5219 --- [           main] i.a.a.c.impl.AxonServerManagedChannel    : Connecting to AxonServer node [localhost:8124] failed.

io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:268) ~[grpc-stub-1.65.1.jar:1.65.1]
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:249) ~[grpc-stub-1.65.1.jar:1.65.1]
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:167) ~[grpc-stub-1.65.1.jar:1.65.1]
	at io.axoniq.axonserver.grpc.control.PlatformServiceGrpc$PlatformServiceBlockingStub.getPlatformServer(PlatformServiceGrpc.java:250) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
	at io.axoniq.axonserver.connector.impl.AxonServerManagedChannel.connectChannel(AxonServerManagedChannel.java:122) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
	at io.axoniq.axonserver.connector.impl.AxonServerManagedChannel.createConnection(AxonServerManagedChannel.java:343) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
	at io.axoniq.axonserver.connector.impl.AxonServerManagedChannel.ensureConnected(AxonServerManagedChannel.java:308) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
	at io.axoniq.axonserver.connector.impl.AxonServerManagedChannel.getState(AxonServerManagedChannel.java:235) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
	at io.axoniq.axonserver.connector.impl.ContextConnection.ensureConnected(ContextConnection.java:237) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
	at io.axoniq.axonserver.connector.impl.ContextConnection.connect(ContextConnection.java:165) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
	at io.axoniq.axonserver.connector.AxonServerConnectionFactory.connect(AxonServerConnectionFactory.java:166) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
	at org.axonframework.axonserver.connector.AxonServerConnectionManager.createConnection(AxonServerConnectionManager.java:135) ~[axon-server-connector-4.10.2.jar:4.10.2]
	at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708) ~[na:na]
	at org.axonframework.axonserver.connector.AxonServerConnectionManager.getConnection(AxonServerConnectionManager.java:131) ~[axon-server-connector-4.10.2.jar:4.10.2]
	at org.axonframework.axonserver.connector.AxonServerConnectionManager.getConnection(AxonServerConnectionManager.java:121) ~[axon-server-connector-4.10.2.jar:4.10.2]
	at org.axonframework.axonserver.connector.AxonServerConnectionManager.start(AxonServerConnectionManager.java:111) ~[axon-server-connector-4.10.2.jar:4.10.2]
	at org.axonframework.lifecycle.Lifecycle$LifecycleRegistry.lambda$onStart$0(Lifecycle.java:68) ~[axon-messaging-4.10.2.jar:4.10.2]
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[na:na]
	at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1024) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[na:na]
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:na]
	at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:662) ~[na:na]
	at org.axonframework.config.DefaultConfigurer.invokeLifecycleHandlers(DefaultConfigurer.java:1058) ~[axon-configuration-4.10.2.jar:4.10.2]
	at org.axonframework.config.DefaultConfigurer.invokeStartHandlers(DefaultConfigurer.java:1004) ~[axon-configuration-4.10.2.jar:4.10.2]
	at org.axonframework.config.DefaultConfigurer$ConfigurationImpl.start(DefaultConfigurer.java:1154) ~[axon-configuration-4.10.2.jar:4.10.2]
	at org.axonframework.spring.config.SpringAxonConfiguration.start(SpringAxonConfiguration.java:89) ~[axon-spring-4.10.2.jar:4.10.2]
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:285) ~[spring-context-6.1.10.jar:6.1.10]
	at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:469) ~[spring-context-6.1.10.jar:6.1.10]
	at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
	at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:257) ~[spring-context-6.1.10.jar:6.1.10]
	at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:202) ~[spring-context-6.1.10.jar:6.1.10]
	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:981) ~[spring-context-6.1.10.jar:6.1.10]
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:627) ~[spring-context-6.1.10.jar:6.1.10]
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.3.1.jar:3.3.1]
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-3.3.1.jar:3.3.1]
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:456) ~[spring-boot-3.3.1.jar:3.3.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:335) ~[spring-boot-3.3.1.jar:3.3.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1363) ~[spring-boot-3.3.1.jar:3.3.1]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1352) ~[spring-boot-3.3.1.jar:3.3.1]
	at org.example.axonkafkatrial.producer.ProducerApplicationKt.main(ProducerApplication.kt:12) ~[main/:na]
Caused by: io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/[0:0:0:0:0:0:0:1]:8124
Caused by: java.net.ConnectException: Connection refused
	at java.base/sun.nio.ch.Net.pollConnect(Native Method) ~[na:na]
	at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:682) ~[na:na]
	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:973) ~[na:na]
	at io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
	at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
	at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
	at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
	at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
	at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
	at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
	at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
	at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
	at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

INFO 5219 --- [           main] i.a.a.c.impl.AxonServerManagedChannel    : Failed to get connection to AxonServer. Scheduling a reconnect in 2000ms

application.yml を作成してAxonServer接続を無効化します。

src/main/resources/application.yml
axon:
  axonserver:
    enabled: false

これで上記の例外が発生しなくなりました。

Axonの動作確認

Axonがちゃんと動くかの確認のため、試しにAxonのCommandGatewayCommandHandlerを使ってみます。

ControllerにPOSTメソッドを追加し、CommandGatewayにcommandを送信します。

src/main/kotlin/org/example/axonkafkatrial/producer/controller/TrialController.kt
package org.example.axonkafkatrial.producer.controller

import org.axonframework.commandhandling.gateway.CommandGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.*
import java.util.*

@RestController
@RequestMapping("/")
class TrialController(private val commandGateway: CommandGateway) { // CommandGatewayをDI
    @GetMapping
    @ResponseStatus(HttpStatus.OK)
    fun get(): GetResponse {
        return GetResponse("success")
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    fun post(@RequestBody req: PostRequest): PostResponse {
        val id = UUID.randomUUID()
        val command = DocCreate(id, req.body)
        commandGateway.sendAndWait<DocCreate>(command) // CommandGatewayにcommandを送信
        return PostResponse(id)
    }
}

data class GetResponse(val body: String)
data class PostRequest(val body: String)
data class PostResponse(val id: UUID)

commandの型定義はこちら

src/main/kotlin/org/example/axonkafkatrial/producer/command/DocCreate.kt
package org.example.axonkafkatrial.producer.command

import java.util.*

data class DocCreate(
    val docId: UUID,
    val body: String,
)

次にcommandを受け取る処理を作成します。

src/main/kotlin/org/example/axonkafkatrial/producer/service/TrialService.kt
package org.example.axonkafkatrial.producer.service

import org.axonframework.commandhandling.CommandHandler
import org.example.axonkafkatrial.producer.command.DocCreate
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service

@Service
class TrialService {
    private val logger = LoggerFactory.getLogger(this.javaClass)

    @CommandHandler
    fun run(command: DocCreate) {
        logger.info("command accepted: $command")
    }
}

ここで動作確認すると、リクエストのJSONパースエラーが発生しました。

$ curl -X POST localhost:8080 \
>  -H 'Content-Type: application/json' \
>  -d '{"body":"test"}'
{"timestamp":"2024-11-09T02:07:49.735+00:00","status":400,"error":"Bad Request","path":"/"}
log
WARN 7648 --- [nio-8080-exec-1] .w.s.m.s.DefaultHandlerExceptionResolver : Resolved [org.springframework.http.converter.HttpMessageNotReadableException: JSON parse error: Cannot construct instance of `org.example.axonkafkatrial.producer.controller.PostRequest` (although at least one Creator exists): cannot deserialize from Object value (no delegate- or property-based Creator)]

JSONシリアライザとしてjacksonを追加します。

build.gradle.kts
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.1") // 追加
}

これで動作確認も成功しました。

$ curl -X POST localhost:8080 \
>  -H 'Content-Type: application/json' \
>  -d '{"body":"test"}'
{"id":"da10b02e-7529-482a-a82f-f632d5b0e02f"}

CommandHandlerのメソッドがcommandを受信してログ出力ができています。

log
INFO 7101 --- [nio-8080-exec-2] o.e.a.producer.service.TrialService      : command accepted: DocCreate(docId=da10b02e-7529-482a-a82f-f632d5b0e02f, body=test)

イベントの発行

次に、EventGatewayでイベントを発行してみます。

src/main/kotlin/org/example/axonkafkatrial/producer/service/TrialService.kt
package org.example.axonkafkatrial.producer.service

import org.axonframework.commandhandling.CommandHandler
import org.axonframework.eventhandling.gateway.EventGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.util.UUID

@Service
class TrialService(private val eventGateway: EventGateway) { // EventGatewayをDI
    private val logger = LoggerFactory.getLogger(this.javaClass)

    @CommandHandler
    fun run(command: DocCreate) {
        logger.info("command accepted: $command")
        val event = DocCreated(command.docId, command.body)
        eventGateway.publish(event) // EventGatewayでイベント発行
    }
}

data class DocCreated(
    val docId: UUID,
    val body: String,
)

これだけでも、これまでと同様にAPIにPOSTして動作確認すれば成功しますが、イベント発行がうまくできているかは、ログが何も出ていないので確認しづらいです。

Axonは発行したイベントをEventBusに保存するようです。

The EventBus is the mechanism that dispatches events to the subscribed event handlers. Axon provides three implementations of the Event Bus: AxonServerEventStoreEmbeddedEventStore and SimpleEventBus.

EventBusには以下の3種類があるようです。

  • AxonServerEventStore
  • EmbeddedEventStore
  • SimpleEventBus

デフォルトはAxonServerEventStoreのようですが、AxonServerは無効化したので、おそらくSimpleEventBus になっていると思われます。

The SimpleEventBus has a volatile storage and 'forgets' events as soon as they have been published to subscribed components.

SimpleEventBus では永続化されないようです。

そこで、発行したイベントを永続化するため、EmbeddedEventStoreを使用します。
EmbeddedEventStoreには以下の3種類があるようです。

  • JpaEventStorageEngine
  • JdbcEventStorageEngine
  • MongoEventStorageEngine

Spring Boot によるAuto Configurationが簡単にできるJpaEventStorageEngineを使用して、MySQLに永続化しようと思います。

まずMySQLをdockerで起動しておきます。

compose.yml
services:
  producer-db:
    image: mysql:latest
    container_name: producer-db
    environment:
      MYSQL_ROOT_PASSWORD: mysql
      MYSQL_DATABASE: trial
      MYSQL_USER: docker
      MYSQL_PASSWORD: docker
      TZ: 'Asia/Tokyo'
    ports:
      - "3306:3306"
$ docker compose up -d producer-db

# 参考までにバージョン情報を記載
$ docker -v
Docker version 27.3.1, build ce12230

JPAでMySQL接続するための依存ライブラリを追加します。

build.gradle.kts
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.1")
    implementation("org.springframework.boot:spring-boot-starter-data-jpa") // 追加
    runtimeOnly ("com.mysql:mysql-connector-j") // 追加
}

application.yml にDB接続設定を追加します。

src/main/resources/application.yml
# 以下を追加
spring:
  datasource:
    url: jdbc:mysql://localhost:3306/trial
    username: docker
    password: docker
    driver-class-name: com.mysql.cj.jdbc.Driver
  jpa:
    database: mysql
    hibernate:
      ddl-auto: update

これでアプリを起動すると、MySQLに以下のテーブルができています。

  • association_value_entry
  • association_value_entry_seq
  • dead_letter_entry
  • domain_event_entry
  • domain_event_entry_seq
  • saga_entry
  • snapshot_event_entry
  • token_entry

この状態で、改めてAPIにPOSTして動作確認します。

$ curl -X POST localhost:8080 \
>  -H 'Content-Type: application/json' \
>  -d '{"body":"test"}'
{"id":"cb7234ff-e787-40f7-acdf-163f7630b20f"}

すると、domain_event_entryテーブルに、発行したイベントのレコードが追加されています。

image.png

domain_event_entryテーブルには以下のカラムがあります。

  • global_index
  • event_identifier
  • meta_data
  • payload
  • payload_version
  • payload_type
  • time_stamp
  • aggregate_identifier
  • sequence_number
  • type

payload_typeにはイベントの型が入っています。

payload_type
org.example.axonkafkatrial.producer.service.DocCreated

payloadにはXML形式でイベントデータが入っています。

payload
<org.example.axonkafkatrial.producer.service.DocCreated>
  <docId>cb7234ff-e787-40f7-acdf-163f7630b20f</docId>
  <body>test</body>
</org.example.axonkafkatrial.producer.service.DocCreated>

これで、イベントが発行でき、永続化できていることが確認できました。

Kafkaへのイベント送信

次に、発行したイベントをKafkaに送信します。

Kafkaの起動

まずはdockerでKafkaを起動します。
kraftモードで1台構成のクラスターを起動し、ブラウザでKafkaの状態が確認できるようにkafka-uiも追加しました。

compose.ymlの設定については以下を参考にさせていただきました。

compose.yml
services:
# 以下を追加
  kafka-0:
    image: bitnami/kafka
    hostname: kafka-0
    ports:
      - "9094:9094"
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-0:9092,EXTERNAL://localhost:9094
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    networks:
      - network0

  kafka-ui-0:
    container_name: kafka-ui-0
    image: provectuslabs/kafka-ui:v0.7.2
    depends_on:
      - kafka-0
    ports:
      - "9080:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=kafka-0
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-0:9092
      - AUTH_TYPE="LOGIN_FORM"
      - SPRING_SECURITY_USER_NAME=admin
      - SPRING_SECURITY_USER_PASSWORD=admin
    networks:
      - network0

networks:
  network0:
$ docker compose up -d kafka-0 kafka-ui-0

ブラウザでlocalhost:9080にアクセスすると、kafka-uiのダッシュボードが表示され、Kafkaのクラスターが起動していることが確認できます。

image.png

アプリケーションからKafkaへの接続とイベント送信

AxonのKafka統合のための依存ライブラリを追加します。

build.gradle.kts
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.1")
    implementation("org.springframework.boot:spring-boot-starter-data-jpa")
    runtimeOnly ("com.mysql:mysql-connector-j")
    implementation("org.axonframework.extensions.kafka:axon-kafka-spring-boot-starter:4.10.0") // 追加
}

ここでアプリを起動してAPIにPOSTすると、Kafkaに接続できず、以下のようなWARNログが出続けます。

log
INFO 9760 --- [nio-8080-exec-1] o.e.a.producer.service.TrialService      : command accepted: DocCreate(docId=7306afc7-e465-4bda-b2ec-cd4d2faeb6b0, body=test)
INFO 9760 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = -1
	auto.include.jmx.reporter = true
	batch.size = 16384
	bootstrap.servers = [localhost:9092]
    〜省略〜

INFO 9760 --- [nio-8080-exec-1] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
INFO 9760 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
INFO 9760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.7.1
INFO 9760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: e2494e6ffb89f828
INFO 9760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1731129441403
INFO 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node -1 disconnected.
WARN 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
WARN 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
INFO 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node -1 disconnected.
WARN 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
WARN 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

ログのProducerConfigを見ると、デフォルトでは localhost:9092に接続しようとしているようです。

dockerで起動したKafkaは、コンテナネットワーク外からの接続を9094ポートで公開しているので、 localhost:9094 で接続する必要があります。

application.yml にKafka接続情報を記述します。

src/main/resources/application.yml
axon:
  axonserver:
    enabled: false
  # 以下を追加
  kafka:
    bootstrap-servers: localhost:9094

これで再チャレンジすると、Kafka接続は成功したようです。

log
INFO 10262 --- [nio-8080-exec-1] o.e.a.producer.service.TrialService      : command accepted: DocCreate(docId=28e50b8d-0688-4105-a674-f3fd57edc4d6, body=test)
INFO 10262 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = -1
	auto.include.jmx.reporter = true
	batch.size = 16384
	bootstrap.servers = [localhost:9094]
    〜省略〜

INFO 10262 --- [nio-8080-exec-1] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
INFO 10262 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
INFO 10262 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.7.1
INFO 10262 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: e2494e6ffb89f828
INFO 10262 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1731129716329
WARN 10262 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {Axon.Events=UNKNOWN_TOPIC_OR_PARTITION}
INFO 10262 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: 6ju05AkfTN-V06364F0G9g
INFO 10262 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0

送信先のトピックを指定していない状態なので、イベントは、デフォルトのAxon.Eventsトピックに送信されているようです。

image.png

特にAxon.Eventsトピックを作成する操作はしていなかったので、トピックは存在しなかったはずですが、Kafkaの設定で auto.create.topics.enabletrueになっているので、Kafkaがイベントを受信した際にトピックが自動作成されたようです。

image.png

Kafkaが受信したメッセージは以下のようになっています。

valueには、domain_event_entryテーブルのpayloadカラムと同じ値が入っています。

value
<org.example.axonkafkatrial.producer.service.DocCreated>
  <docId>cb7234ff-e787-40f7-acdf-163f7630b20f</docId>
  <body>test</body>
</org.example.axonkafkatrial.producer.service.DocCreated>

headerには、以下のようなJSONが入っています。

header
{
  "axon-message-id": "722ff952-e7c3-4d09-a960-fecf0aac0d95",
  "axon-metadata-correlationId": "5c7a8e67-0b65-405c-8fa8-fc35203624ef",
  "axon-message-revision": null,
  "axon-message-timestamp": "\u0000\u0000\u0001�\u000f_-�",
  "axon-message-type": "org.example.axonkafkatrial.producer.service.DocCreated",
  "axon-metadata-traceId": "5c7a8e67-0b65-405c-8fa8-fc35203624ef"
}

domain_event_entry テーブルのカラムの値との対応関係は以下になっていると思われます。

  • axon-message-idevent_identifier
  • axon-metadata-correlationIdmeta_data カラムのXML内の correlationId
  • axon-message-revisionpayload_revision (nullなので合っているか確度低)
  • axon-message-timestamptime_stamp(文字化けしているので確度低)
  • axon-message-typepayload_type
  • axon-message-traceIdmeta_data カラムのXML内の traceId

おまけ

ちなみに、Axonで、CQRS/ESはやらず、Kafkaにイベントを送信するだけなら、Axon導入の最初に試したCommandGatewayCommandHandlerを使う必要はありません。

例えば、以下のような書き方でも、イベント送信は可能です。

src/main/kotlin/org/example/axonkafkatrial/producer/controller/TrialController.kt
package org.example.axonkafkatrial.producer.controller

import org.axonframework.commandhandling.gateway.CommandGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.example.axonkafkatrial.producer.service.TrialService
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.*
import java.util.*

@RestController
@RequestMapping("/")
class TrialController(
//    private val commandGateway: CommandGateway,
    private val trialService: TrialService,
) {
    @GetMapping
    @ResponseStatus(HttpStatus.OK)
    fun get(): GetResponse {
        return GetResponse("success")
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    fun post(@RequestBody req: PostRequest): PostResponse {
        val id = UUID.randomUUID()
        val command = DocCreate(id, req.body)
//        commandGateway.sendAndWait<DocCreate>(command)
        trialService.run(command) // CommandGatewayを使用せずTrialServiceを直接呼ぶ
        return PostResponse(id)
    }
}

data class GetResponse(val body: String)
data class PostRequest(val body: String)
data class PostResponse(val id: UUID)

src/main/kotlin/org/example/axonkafkatrial/producer/service/TrialService.kt
package org.example.axonkafkatrial.producer.service

import org.axonframework.commandhandling.CommandHandler
import org.axonframework.eventhandling.gateway.EventGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.util.UUID

@Service
class TrialService(private val eventGateway: EventGateway) {
    private val logger = LoggerFactory.getLogger(this.javaClass)

//    @CommandHandler
    fun run(command: DocCreate) {
        logger.info("command accepted: $command")
        val event = DocCreated(command.docId, command.body)
        eventGateway.publish(event)
    }
}

data class DocCreated(
    val docId: UUID,
    val body: String,
)

まとめ

Axonを使用してKafkaにイベントを送信する最小限の実装を試しました。

引き続き、各種設定・カスタマイズの方法や、Kafkaからイベントを受信する方法についても確認していく予定です。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?