はじめに
Axon Framework(以降、Axonと記述)を使用してKafkaにイベントを送信する最小限の実装を試した過程を残しておきます。
Kotlinを使用し、Spring Bootアプリケーションとして構築しています。
失敗も含めて小さいステップで確認しながら進めているので、最終形だけ見たい方は、githubリポジトリをご参照ください。
実装においては、以下のリポジトリを参考にさせていただきました。
プロジェクトのセットアップ
プロジェクトの初期状態のディレクトリ・ファイル構成は以下です。
ProducerApplicationをSpring Bootアプリケーションとして起動し、動作確認用のAPIエンドポイントを1つ用意した状態です。
producer/
├── build.gradle.kts
├── gradle
│ └── wrapper
│ ├── gradle-wrapper.jar
│ └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── settings.gradle.kts
└── src
└── main
└── kotlin
└── org
└── example
└── axonkafkatrial
└── producer
├── ProducerApplication.kt
└── controller
└── TrialController.kt
rootProject.name = "axon_kafka_trial_producer"
plugins {
val kotlinVersion = "2.0.0"
kotlin("jvm") version kotlinVersion
kotlin("plugin.spring") version kotlinVersion
id("org.springframework.boot") version "3.3.5"
id("io.spring.dependency-management") version "1.1.5"
}
group = "org.example"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
}
kotlin {
jvmToolchain(21)
}
package org.example.axonkafkatrial.producer
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
@SpringBootApplication
class ProducerApplication
fun main(args: Array<String>) {
runApplication<ProducerApplication>(*args)
}
package org.example.axonkafkatrial.producer.controller
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/")
class TrialController {
@GetMapping
@ResponseStatus(HttpStatus.OK)
fun get(): GetResponse {
return GetResponse("success")
}
}
data class GetResponse(val body: String)
APIエンドポイントの動作確認
$ curl localhost:8080
{"body":"success"}
参考までに動作確認時のローカル環境情報を記載します。
$ ./gradlew --version
------------------------------------------------------------
Gradle 8.5
------------------------------------------------------------
Build time: 2023-11-29 14:08:57 UTC
Revision: 28aca86a7180baa17117e0e5ba01d8ea9feca598
Kotlin: 1.9.20
Groovy: 3.0.17
Ant: Apache Ant(TM) version 1.10.13 compiled on January 4 2023
JVM: 21.0.3 (Amazon.com Inc. 21.0.3+9-LTS)
OS: Mac OS X 14.5 x86_64
いい加減、Intel Mac は卒業したい。。。
Axon の導入
依存ライブラリの追加
Spring BootでAuto Configurationができるように、 axon-spring-boot-starterを使用します。
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.axonframework:axon-spring-boot-starter:4.10.2") // 追加
}
この状態でアプリ起動すると、AxonServerに接続しようとして、以下の例外が発生しました。
INFO 5219 --- [ main] i.a.a.c.impl.AxonServerManagedChannel : Requesting connection details from localhost:8124
**********************************************
* *
* !!! UNABLE TO CONNECT TO AXON SERVER !!! *
* *
* Are you sure it's running? *
* Don't have Axon Server yet? *
* Go to: https://axoniq.io/go-axon *
* *
**********************************************
To suppress this message, you can
- explicitly configure an AxonServer location,
- start with -Daxon.axonserver.suppressDownloadMessage=true
WARN 5219 --- [ main] i.a.a.c.impl.AxonServerManagedChannel : Connecting to AxonServer node [localhost:8124] failed.
io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:268) ~[grpc-stub-1.65.1.jar:1.65.1]
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:249) ~[grpc-stub-1.65.1.jar:1.65.1]
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:167) ~[grpc-stub-1.65.1.jar:1.65.1]
at io.axoniq.axonserver.grpc.control.PlatformServiceGrpc$PlatformServiceBlockingStub.getPlatformServer(PlatformServiceGrpc.java:250) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.impl.AxonServerManagedChannel.connectChannel(AxonServerManagedChannel.java:122) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.impl.AxonServerManagedChannel.createConnection(AxonServerManagedChannel.java:343) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.impl.AxonServerManagedChannel.ensureConnected(AxonServerManagedChannel.java:308) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.impl.AxonServerManagedChannel.getState(AxonServerManagedChannel.java:235) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.impl.ContextConnection.ensureConnected(ContextConnection.java:237) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.impl.ContextConnection.connect(ContextConnection.java:165) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.AxonServerConnectionFactory.connect(AxonServerConnectionFactory.java:166) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at org.axonframework.axonserver.connector.AxonServerConnectionManager.createConnection(AxonServerConnectionManager.java:135) ~[axon-server-connector-4.10.2.jar:4.10.2]
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708) ~[na:na]
at org.axonframework.axonserver.connector.AxonServerConnectionManager.getConnection(AxonServerConnectionManager.java:131) ~[axon-server-connector-4.10.2.jar:4.10.2]
at org.axonframework.axonserver.connector.AxonServerConnectionManager.getConnection(AxonServerConnectionManager.java:121) ~[axon-server-connector-4.10.2.jar:4.10.2]
at org.axonframework.axonserver.connector.AxonServerConnectionManager.start(AxonServerConnectionManager.java:111) ~[axon-server-connector-4.10.2.jar:4.10.2]
at org.axonframework.lifecycle.Lifecycle$LifecycleRegistry.lambda$onStart$0(Lifecycle.java:68) ~[axon-messaging-4.10.2.jar:4.10.2]
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[na:na]
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1024) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[na:na]
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:na]
at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:662) ~[na:na]
at org.axonframework.config.DefaultConfigurer.invokeLifecycleHandlers(DefaultConfigurer.java:1058) ~[axon-configuration-4.10.2.jar:4.10.2]
at org.axonframework.config.DefaultConfigurer.invokeStartHandlers(DefaultConfigurer.java:1004) ~[axon-configuration-4.10.2.jar:4.10.2]
at org.axonframework.config.DefaultConfigurer$ConfigurationImpl.start(DefaultConfigurer.java:1154) ~[axon-configuration-4.10.2.jar:4.10.2]
at org.axonframework.spring.config.SpringAxonConfiguration.start(SpringAxonConfiguration.java:89) ~[axon-spring-4.10.2.jar:4.10.2]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:285) ~[spring-context-6.1.10.jar:6.1.10]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:469) ~[spring-context-6.1.10.jar:6.1.10]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:257) ~[spring-context-6.1.10.jar:6.1.10]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:202) ~[spring-context-6.1.10.jar:6.1.10]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:981) ~[spring-context-6.1.10.jar:6.1.10]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:627) ~[spring-context-6.1.10.jar:6.1.10]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.3.1.jar:3.3.1]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-3.3.1.jar:3.3.1]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:456) ~[spring-boot-3.3.1.jar:3.3.1]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:335) ~[spring-boot-3.3.1.jar:3.3.1]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1363) ~[spring-boot-3.3.1.jar:3.3.1]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1352) ~[spring-boot-3.3.1.jar:3.3.1]
at org.example.axonkafkatrial.producer.ProducerApplicationKt.main(ProducerApplication.kt:12) ~[main/:na]
Caused by: io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/[0:0:0:0:0:0:0:1]:8124
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.pollConnect(Native Method) ~[na:na]
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:682) ~[na:na]
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:973) ~[na:na]
at io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
INFO 5219 --- [ main] i.a.a.c.impl.AxonServerManagedChannel : Failed to get connection to AxonServer. Scheduling a reconnect in 2000ms
application.yml を作成してAxonServer接続を無効化します。
axon:
axonserver:
enabled: false
これで上記の例外が発生しなくなりました。
Axonの動作確認
Axonがちゃんと動くかの確認のため、試しにAxonのCommandGatewayやCommandHandlerを使ってみます。
ControllerにPOSTメソッドを追加し、CommandGatewayにcommandを送信します。
package org.example.axonkafkatrial.producer.controller
import org.axonframework.commandhandling.gateway.CommandGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.*
import java.util.*
@RestController
@RequestMapping("/")
class TrialController(private val commandGateway: CommandGateway) { // CommandGatewayをDI
@GetMapping
@ResponseStatus(HttpStatus.OK)
fun get(): GetResponse {
return GetResponse("success")
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
fun post(@RequestBody req: PostRequest): PostResponse {
val id = UUID.randomUUID()
val command = DocCreate(id, req.body)
commandGateway.sendAndWait<DocCreate>(command) // CommandGatewayにcommandを送信
return PostResponse(id)
}
}
data class GetResponse(val body: String)
data class PostRequest(val body: String)
data class PostResponse(val id: UUID)
commandの型定義はこちら
package org.example.axonkafkatrial.producer.command
import java.util.*
data class DocCreate(
val docId: UUID,
val body: String,
)
次にcommandを受け取る処理を作成します。
package org.example.axonkafkatrial.producer.service
import org.axonframework.commandhandling.CommandHandler
import org.example.axonkafkatrial.producer.command.DocCreate
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
@Service
class TrialService {
private val logger = LoggerFactory.getLogger(this.javaClass)
@CommandHandler
fun run(command: DocCreate) {
logger.info("command accepted: $command")
}
}
ここで動作確認すると、リクエストのJSONパースエラーが発生しました。
$ curl -X POST localhost:8080 \
> -H 'Content-Type: application/json' \
> -d '{"body":"test"}'
{"timestamp":"2024-11-09T02:07:49.735+00:00","status":400,"error":"Bad Request","path":"/"}
WARN 7648 --- [nio-8080-exec-1] .w.s.m.s.DefaultHandlerExceptionResolver : Resolved [org.springframework.http.converter.HttpMessageNotReadableException: JSON parse error: Cannot construct instance of `org.example.axonkafkatrial.producer.controller.PostRequest` (although at least one Creator exists): cannot deserialize from Object value (no delegate- or property-based Creator)]
JSONシリアライザとしてjacksonを追加します。
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.1") // 追加
}
これで動作確認も成功しました。
$ curl -X POST localhost:8080 \
> -H 'Content-Type: application/json' \
> -d '{"body":"test"}'
{"id":"da10b02e-7529-482a-a82f-f632d5b0e02f"}
CommandHandlerのメソッドがcommandを受信してログ出力ができています。
INFO 7101 --- [nio-8080-exec-2] o.e.a.producer.service.TrialService : command accepted: DocCreate(docId=da10b02e-7529-482a-a82f-f632d5b0e02f, body=test)
イベントの発行
次に、EventGatewayでイベントを発行してみます。
package org.example.axonkafkatrial.producer.service
import org.axonframework.commandhandling.CommandHandler
import org.axonframework.eventhandling.gateway.EventGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.util.UUID
@Service
class TrialService(private val eventGateway: EventGateway) { // EventGatewayをDI
private val logger = LoggerFactory.getLogger(this.javaClass)
@CommandHandler
fun run(command: DocCreate) {
logger.info("command accepted: $command")
val event = DocCreated(command.docId, command.body)
eventGateway.publish(event) // EventGatewayでイベント発行
}
}
data class DocCreated(
val docId: UUID,
val body: String,
)
これだけでも、これまでと同様にAPIにPOSTして動作確認すれば成功しますが、イベント発行がうまくできているかは、ログが何も出ていないので確認しづらいです。
Axonは発行したイベントをEventBusに保存するようです。
The
EventBusis the mechanism that dispatches events to the subscribed event handlers. Axon provides three implementations of the Event Bus:AxonServerEventStore,EmbeddedEventStoreandSimpleEventBus.
EventBusには以下の3種類があるようです。
AxonServerEventStoreEmbeddedEventStoreSimpleEventBus
デフォルトはAxonServerEventStoreのようですが、AxonServerは無効化したので、おそらくSimpleEventBus になっていると思われます。
The
SimpleEventBushas a volatile storage and 'forgets' events as soon as they have been published to subscribed components.
SimpleEventBus では永続化されないようです。
そこで、発行したイベントを永続化するため、EmbeddedEventStoreを使用します。
EmbeddedEventStoreには以下の3種類があるようです。
JpaEventStorageEngineJdbcEventStorageEngineMongoEventStorageEngine
Spring Boot によるAuto Configurationが簡単にできるJpaEventStorageEngineを使用して、MySQLに永続化しようと思います。
まずMySQLをdockerで起動しておきます。
services:
producer-db:
image: mysql:latest
container_name: producer-db
environment:
MYSQL_ROOT_PASSWORD: mysql
MYSQL_DATABASE: trial
MYSQL_USER: docker
MYSQL_PASSWORD: docker
TZ: 'Asia/Tokyo'
ports:
- "3306:3306"
$ docker compose up -d producer-db
# 参考までにバージョン情報を記載
$ docker -v
Docker version 27.3.1, build ce12230
JPAでMySQL接続するための依存ライブラリを追加します。
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.1")
implementation("org.springframework.boot:spring-boot-starter-data-jpa") // 追加
runtimeOnly ("com.mysql:mysql-connector-j") // 追加
}
application.yml にDB接続設定を追加します。
# 以下を追加
spring:
datasource:
url: jdbc:mysql://localhost:3306/trial
username: docker
password: docker
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
database: mysql
hibernate:
ddl-auto: update
これでアプリを起動すると、MySQLに以下のテーブルができています。
association_value_entryassociation_value_entry_seqdead_letter_entrydomain_event_entrydomain_event_entry_seqsaga_entrysnapshot_event_entrytoken_entry
この状態で、改めてAPIにPOSTして動作確認します。
$ curl -X POST localhost:8080 \
> -H 'Content-Type: application/json' \
> -d '{"body":"test"}'
{"id":"cb7234ff-e787-40f7-acdf-163f7630b20f"}
すると、domain_event_entryテーブルに、発行したイベントのレコードが追加されています。
domain_event_entryテーブルには以下のカラムがあります。
global_indexevent_identifiermeta_datapayloadpayload_versionpayload_typetime_stampaggregate_identifiersequence_numbertype
payload_typeにはイベントの型が入っています。
org.example.axonkafkatrial.producer.service.DocCreated
payloadにはXML形式でイベントデータが入っています。
<org.example.axonkafkatrial.producer.service.DocCreated>
<docId>cb7234ff-e787-40f7-acdf-163f7630b20f</docId>
<body>test</body>
</org.example.axonkafkatrial.producer.service.DocCreated>
これで、イベントが発行でき、永続化できていることが確認できました。
Kafkaへのイベント送信
次に、発行したイベントをKafkaに送信します。
Kafkaの起動
まずはdockerでKafkaを起動します。
kraftモードで1台構成のクラスターを起動し、ブラウザでKafkaの状態が確認できるようにkafka-uiも追加しました。
compose.ymlの設定については以下を参考にさせていただきました。
services:
# 以下を追加
kafka-0:
image: bitnami/kafka
hostname: kafka-0
ports:
- "9094:9094"
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-0:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
networks:
- network0
kafka-ui-0:
container_name: kafka-ui-0
image: provectuslabs/kafka-ui:v0.7.2
depends_on:
- kafka-0
ports:
- "9080:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=kafka-0
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-0:9092
- AUTH_TYPE="LOGIN_FORM"
- SPRING_SECURITY_USER_NAME=admin
- SPRING_SECURITY_USER_PASSWORD=admin
networks:
- network0
networks:
network0:
$ docker compose up -d kafka-0 kafka-ui-0
ブラウザでlocalhost:9080にアクセスすると、kafka-uiのダッシュボードが表示され、Kafkaのクラスターが起動していることが確認できます。
アプリケーションからKafkaへの接続とイベント送信
AxonのKafka統合のための依存ライブラリを追加します。
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.1")
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
runtimeOnly ("com.mysql:mysql-connector-j")
implementation("org.axonframework.extensions.kafka:axon-kafka-spring-boot-starter:4.10.0") // 追加
}
ここでアプリを起動してAPIにPOSTすると、Kafkaに接続できず、以下のようなWARNログが出続けます。
INFO 9760 --- [nio-8080-exec-1] o.e.a.producer.service.TrialService : command accepted: DocCreate(docId=7306afc7-e465-4bda-b2ec-cd4d2faeb6b0, body=test)
INFO 9760 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = -1
auto.include.jmx.reporter = true
batch.size = 16384
bootstrap.servers = [localhost:9092]
〜省略〜
INFO 9760 --- [nio-8080-exec-1] o.a.k.c.t.i.KafkaMetricsCollector : initializing Kafka metrics collector
INFO 9760 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
INFO 9760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.7.1
INFO 9760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: e2494e6ffb89f828
INFO 9760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1731129441403
INFO 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node -1 disconnected.
WARN 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
WARN 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
INFO 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node -1 disconnected.
WARN 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
WARN 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
ログのProducerConfigを見ると、デフォルトでは localhost:9092に接続しようとしているようです。
dockerで起動したKafkaは、コンテナネットワーク外からの接続を9094ポートで公開しているので、 localhost:9094 で接続する必要があります。
application.yml にKafka接続情報を記述します。
axon:
axonserver:
enabled: false
# 以下を追加
kafka:
bootstrap-servers: localhost:9094
これで再チャレンジすると、Kafka接続は成功したようです。
INFO 10262 --- [nio-8080-exec-1] o.e.a.producer.service.TrialService : command accepted: DocCreate(docId=28e50b8d-0688-4105-a674-f3fd57edc4d6, body=test)
INFO 10262 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = -1
auto.include.jmx.reporter = true
batch.size = 16384
bootstrap.servers = [localhost:9094]
〜省略〜
INFO 10262 --- [nio-8080-exec-1] o.a.k.c.t.i.KafkaMetricsCollector : initializing Kafka metrics collector
INFO 10262 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
INFO 10262 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.7.1
INFO 10262 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: e2494e6ffb89f828
INFO 10262 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1731129716329
WARN 10262 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {Axon.Events=UNKNOWN_TOPIC_OR_PARTITION}
INFO 10262 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: 6ju05AkfTN-V06364F0G9g
INFO 10262 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
送信先のトピックを指定していない状態なので、イベントは、デフォルトのAxon.Eventsトピックに送信されているようです。
特にAxon.Eventsトピックを作成する操作はしていなかったので、トピックは存在しなかったはずですが、Kafkaの設定で auto.create.topics.enableが trueになっているので、Kafkaがイベントを受信した際にトピックが自動作成されたようです。
Kafkaが受信したメッセージは以下のようになっています。
valueには、domain_event_entryテーブルのpayloadカラムと同じ値が入っています。
<org.example.axonkafkatrial.producer.service.DocCreated>
<docId>cb7234ff-e787-40f7-acdf-163f7630b20f</docId>
<body>test</body>
</org.example.axonkafkatrial.producer.service.DocCreated>
headerには、以下のようなJSONが入っています。
{
"axon-message-id": "722ff952-e7c3-4d09-a960-fecf0aac0d95",
"axon-metadata-correlationId": "5c7a8e67-0b65-405c-8fa8-fc35203624ef",
"axon-message-revision": null,
"axon-message-timestamp": "\u0000\u0000\u0001�\u000f_-�",
"axon-message-type": "org.example.axonkafkatrial.producer.service.DocCreated",
"axon-metadata-traceId": "5c7a8e67-0b65-405c-8fa8-fc35203624ef"
}
domain_event_entry テーブルのカラムの値との対応関係は以下になっていると思われます。
-
axon-message-id→event_identifier -
axon-metadata-correlationId→meta_dataカラムのXML内のcorrelationId -
axon-message-revision→payload_revision(nullなので合っているか確度低) -
axon-message-timestamp→time_stamp(文字化けしているので確度低) -
axon-message-type→payload_type -
axon-message-traceId→meta_dataカラムのXML内のtraceId
おまけ
ちなみに、Axonで、CQRS/ESはやらず、Kafkaにイベントを送信するだけなら、Axon導入の最初に試したCommandGatewayやCommandHandlerを使う必要はありません。
例えば、以下のような書き方でも、イベント送信は可能です。
package org.example.axonkafkatrial.producer.controller
import org.axonframework.commandhandling.gateway.CommandGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.example.axonkafkatrial.producer.service.TrialService
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.*
import java.util.*
@RestController
@RequestMapping("/")
class TrialController(
// private val commandGateway: CommandGateway,
private val trialService: TrialService,
) {
@GetMapping
@ResponseStatus(HttpStatus.OK)
fun get(): GetResponse {
return GetResponse("success")
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
fun post(@RequestBody req: PostRequest): PostResponse {
val id = UUID.randomUUID()
val command = DocCreate(id, req.body)
// commandGateway.sendAndWait<DocCreate>(command)
trialService.run(command) // CommandGatewayを使用せずTrialServiceを直接呼ぶ
return PostResponse(id)
}
}
data class GetResponse(val body: String)
data class PostRequest(val body: String)
data class PostResponse(val id: UUID)
package org.example.axonkafkatrial.producer.service
import org.axonframework.commandhandling.CommandHandler
import org.axonframework.eventhandling.gateway.EventGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.util.UUID
@Service
class TrialService(private val eventGateway: EventGateway) {
private val logger = LoggerFactory.getLogger(this.javaClass)
// @CommandHandler
fun run(command: DocCreate) {
logger.info("command accepted: $command")
val event = DocCreated(command.docId, command.body)
eventGateway.publish(event)
}
}
data class DocCreated(
val docId: UUID,
val body: String,
)
まとめ
Axonを使用してKafkaにイベントを送信する最小限の実装を試しました。
引き続き、各種設定・カスタマイズの方法や、Kafkaからイベントを受信する方法についても確認していく予定です。



