はじめに
Axon Framework(以降、Axonと記述)を使用してKafkaにイベントを送信する最小限の実装を試した過程を残しておきます。
Kotlinを使用し、Spring Bootアプリケーションとして構築しています。
失敗も含めて小さいステップで確認しながら進めているので、最終形だけ見たい方は、githubリポジトリをご参照ください。
実装においては、以下のリポジトリを参考にさせていただきました。
プロジェクトのセットアップ
プロジェクトの初期状態のディレクトリ・ファイル構成は以下です。
ProducerApplicationをSpring Bootアプリケーションとして起動し、動作確認用のAPIエンドポイントを1つ用意した状態です。
producer/
├── build.gradle.kts
├── gradle
│ └── wrapper
│ ├── gradle-wrapper.jar
│ └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── settings.gradle.kts
└── src
└── main
└── kotlin
└── org
└── example
└── axonkafkatrial
└── producer
├── ProducerApplication.kt
└── controller
└── TrialController.kt
rootProject.name = "axon_kafka_trial_producer"
plugins {
val kotlinVersion = "2.0.0"
kotlin("jvm") version kotlinVersion
kotlin("plugin.spring") version kotlinVersion
id("org.springframework.boot") version "3.3.5"
id("io.spring.dependency-management") version "1.1.5"
}
group = "org.example"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
}
kotlin {
jvmToolchain(21)
}
package org.example.axonkafkatrial.producer
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
@SpringBootApplication
class ProducerApplication
fun main(args: Array<String>) {
runApplication<ProducerApplication>(*args)
}
package org.example.axonkafkatrial.producer.controller
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/")
class TrialController {
@GetMapping
@ResponseStatus(HttpStatus.OK)
fun get(): GetResponse {
return GetResponse("success")
}
}
data class GetResponse(val body: String)
APIエンドポイントの動作確認
$ curl localhost:8080
{"body":"success"}
参考までに動作確認時のローカル環境情報を記載します。
$ ./gradlew --version
------------------------------------------------------------
Gradle 8.5
------------------------------------------------------------
Build time: 2023-11-29 14:08:57 UTC
Revision: 28aca86a7180baa17117e0e5ba01d8ea9feca598
Kotlin: 1.9.20
Groovy: 3.0.17
Ant: Apache Ant(TM) version 1.10.13 compiled on January 4 2023
JVM: 21.0.3 (Amazon.com Inc. 21.0.3+9-LTS)
OS: Mac OS X 14.5 x86_64
いい加減、Intel Mac は卒業したい。。。
Axon の導入
依存ライブラリの追加
Spring BootでAuto Configurationができるように、 axon-spring-boot-starter
を使用します。
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.axonframework:axon-spring-boot-starter:4.10.2") // 追加
}
この状態でアプリ起動すると、AxonServerに接続しようとして、以下の例外が発生しました。
INFO 5219 --- [ main] i.a.a.c.impl.AxonServerManagedChannel : Requesting connection details from localhost:8124
**********************************************
* *
* !!! UNABLE TO CONNECT TO AXON SERVER !!! *
* *
* Are you sure it's running? *
* Don't have Axon Server yet? *
* Go to: https://axoniq.io/go-axon *
* *
**********************************************
To suppress this message, you can
- explicitly configure an AxonServer location,
- start with -Daxon.axonserver.suppressDownloadMessage=true
WARN 5219 --- [ main] i.a.a.c.impl.AxonServerManagedChannel : Connecting to AxonServer node [localhost:8124] failed.
io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:268) ~[grpc-stub-1.65.1.jar:1.65.1]
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:249) ~[grpc-stub-1.65.1.jar:1.65.1]
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:167) ~[grpc-stub-1.65.1.jar:1.65.1]
at io.axoniq.axonserver.grpc.control.PlatformServiceGrpc$PlatformServiceBlockingStub.getPlatformServer(PlatformServiceGrpc.java:250) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.impl.AxonServerManagedChannel.connectChannel(AxonServerManagedChannel.java:122) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.impl.AxonServerManagedChannel.createConnection(AxonServerManagedChannel.java:343) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.impl.AxonServerManagedChannel.ensureConnected(AxonServerManagedChannel.java:308) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.impl.AxonServerManagedChannel.getState(AxonServerManagedChannel.java:235) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.impl.ContextConnection.ensureConnected(ContextConnection.java:237) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.impl.ContextConnection.connect(ContextConnection.java:165) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at io.axoniq.axonserver.connector.AxonServerConnectionFactory.connect(AxonServerConnectionFactory.java:166) ~[axonserver-connector-java-2024.1.1.jar:2024.1.1]
at org.axonframework.axonserver.connector.AxonServerConnectionManager.createConnection(AxonServerConnectionManager.java:135) ~[axon-server-connector-4.10.2.jar:4.10.2]
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708) ~[na:na]
at org.axonframework.axonserver.connector.AxonServerConnectionManager.getConnection(AxonServerConnectionManager.java:131) ~[axon-server-connector-4.10.2.jar:4.10.2]
at org.axonframework.axonserver.connector.AxonServerConnectionManager.getConnection(AxonServerConnectionManager.java:121) ~[axon-server-connector-4.10.2.jar:4.10.2]
at org.axonframework.axonserver.connector.AxonServerConnectionManager.start(AxonServerConnectionManager.java:111) ~[axon-server-connector-4.10.2.jar:4.10.2]
at org.axonframework.lifecycle.Lifecycle$LifecycleRegistry.lambda$onStart$0(Lifecycle.java:68) ~[axon-messaging-4.10.2.jar:4.10.2]
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[na:na]
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:1024) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[na:na]
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:na]
at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:662) ~[na:na]
at org.axonframework.config.DefaultConfigurer.invokeLifecycleHandlers(DefaultConfigurer.java:1058) ~[axon-configuration-4.10.2.jar:4.10.2]
at org.axonframework.config.DefaultConfigurer.invokeStartHandlers(DefaultConfigurer.java:1004) ~[axon-configuration-4.10.2.jar:4.10.2]
at org.axonframework.config.DefaultConfigurer$ConfigurationImpl.start(DefaultConfigurer.java:1154) ~[axon-configuration-4.10.2.jar:4.10.2]
at org.axonframework.spring.config.SpringAxonConfiguration.start(SpringAxonConfiguration.java:89) ~[axon-spring-4.10.2.jar:4.10.2]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:285) ~[spring-context-6.1.10.jar:6.1.10]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:469) ~[spring-context-6.1.10.jar:6.1.10]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:257) ~[spring-context-6.1.10.jar:6.1.10]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:202) ~[spring-context-6.1.10.jar:6.1.10]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:981) ~[spring-context-6.1.10.jar:6.1.10]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:627) ~[spring-context-6.1.10.jar:6.1.10]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.3.1.jar:3.3.1]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-3.3.1.jar:3.3.1]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:456) ~[spring-boot-3.3.1.jar:3.3.1]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:335) ~[spring-boot-3.3.1.jar:3.3.1]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1363) ~[spring-boot-3.3.1.jar:3.3.1]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1352) ~[spring-boot-3.3.1.jar:3.3.1]
at org.example.axonkafkatrial.producer.ProducerApplicationKt.main(ProducerApplication.kt:12) ~[main/:na]
Caused by: io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/[0:0:0:0:0:0:0:1]:8124
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.pollConnect(Native Method) ~[na:na]
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:682) ~[na:na]
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:973) ~[na:na]
at io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[grpc-netty-shaded-1.65.1.jar:1.65.1]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
INFO 5219 --- [ main] i.a.a.c.impl.AxonServerManagedChannel : Failed to get connection to AxonServer. Scheduling a reconnect in 2000ms
application.yml を作成してAxonServer接続を無効化します。
axon:
axonserver:
enabled: false
これで上記の例外が発生しなくなりました。
Axonの動作確認
Axonがちゃんと動くかの確認のため、試しにAxonのCommandGateway
やCommandHandler
を使ってみます。
ControllerにPOSTメソッドを追加し、CommandGateway
にcommandを送信します。
package org.example.axonkafkatrial.producer.controller
import org.axonframework.commandhandling.gateway.CommandGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.*
import java.util.*
@RestController
@RequestMapping("/")
class TrialController(private val commandGateway: CommandGateway) { // CommandGatewayをDI
@GetMapping
@ResponseStatus(HttpStatus.OK)
fun get(): GetResponse {
return GetResponse("success")
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
fun post(@RequestBody req: PostRequest): PostResponse {
val id = UUID.randomUUID()
val command = DocCreate(id, req.body)
commandGateway.sendAndWait<DocCreate>(command) // CommandGatewayにcommandを送信
return PostResponse(id)
}
}
data class GetResponse(val body: String)
data class PostRequest(val body: String)
data class PostResponse(val id: UUID)
commandの型定義はこちら
package org.example.axonkafkatrial.producer.command
import java.util.*
data class DocCreate(
val docId: UUID,
val body: String,
)
次にcommandを受け取る処理を作成します。
package org.example.axonkafkatrial.producer.service
import org.axonframework.commandhandling.CommandHandler
import org.example.axonkafkatrial.producer.command.DocCreate
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
@Service
class TrialService {
private val logger = LoggerFactory.getLogger(this.javaClass)
@CommandHandler
fun run(command: DocCreate) {
logger.info("command accepted: $command")
}
}
ここで動作確認すると、リクエストのJSONパースエラーが発生しました。
$ curl -X POST localhost:8080 \
> -H 'Content-Type: application/json' \
> -d '{"body":"test"}'
{"timestamp":"2024-11-09T02:07:49.735+00:00","status":400,"error":"Bad Request","path":"/"}
WARN 7648 --- [nio-8080-exec-1] .w.s.m.s.DefaultHandlerExceptionResolver : Resolved [org.springframework.http.converter.HttpMessageNotReadableException: JSON parse error: Cannot construct instance of `org.example.axonkafkatrial.producer.controller.PostRequest` (although at least one Creator exists): cannot deserialize from Object value (no delegate- or property-based Creator)]
JSONシリアライザとしてjacksonを追加します。
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.1") // 追加
}
これで動作確認も成功しました。
$ curl -X POST localhost:8080 \
> -H 'Content-Type: application/json' \
> -d '{"body":"test"}'
{"id":"da10b02e-7529-482a-a82f-f632d5b0e02f"}
CommandHandler
のメソッドがcommandを受信してログ出力ができています。
INFO 7101 --- [nio-8080-exec-2] o.e.a.producer.service.TrialService : command accepted: DocCreate(docId=da10b02e-7529-482a-a82f-f632d5b0e02f, body=test)
イベントの発行
次に、EventGateway
でイベントを発行してみます。
package org.example.axonkafkatrial.producer.service
import org.axonframework.commandhandling.CommandHandler
import org.axonframework.eventhandling.gateway.EventGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.util.UUID
@Service
class TrialService(private val eventGateway: EventGateway) { // EventGatewayをDI
private val logger = LoggerFactory.getLogger(this.javaClass)
@CommandHandler
fun run(command: DocCreate) {
logger.info("command accepted: $command")
val event = DocCreated(command.docId, command.body)
eventGateway.publish(event) // EventGatewayでイベント発行
}
}
data class DocCreated(
val docId: UUID,
val body: String,
)
これだけでも、これまでと同様にAPIにPOSTして動作確認すれば成功しますが、イベント発行がうまくできているかは、ログが何も出ていないので確認しづらいです。
Axonは発行したイベントをEventBusに保存するようです。
The
EventBus
is the mechanism that dispatches events to the subscribed event handlers. Axon provides three implementations of the Event Bus:AxonServerEventStore
,EmbeddedEventStore
andSimpleEventBus
.
EventBusには以下の3種類があるようです。
AxonServerEventStore
EmbeddedEventStore
SimpleEventBus
デフォルトはAxonServerEventStore
のようですが、AxonServerは無効化したので、おそらくSimpleEventBus
になっていると思われます。
The
SimpleEventBus
has a volatile storage and 'forgets' events as soon as they have been published to subscribed components.
SimpleEventBus
では永続化されないようです。
そこで、発行したイベントを永続化するため、EmbeddedEventStore
を使用します。
EmbeddedEventStore
には以下の3種類があるようです。
JpaEventStorageEngine
JdbcEventStorageEngine
MongoEventStorageEngine
Spring Boot によるAuto Configurationが簡単にできるJpaEventStorageEngine
を使用して、MySQLに永続化しようと思います。
まずMySQLをdockerで起動しておきます。
services:
producer-db:
image: mysql:latest
container_name: producer-db
environment:
MYSQL_ROOT_PASSWORD: mysql
MYSQL_DATABASE: trial
MYSQL_USER: docker
MYSQL_PASSWORD: docker
TZ: 'Asia/Tokyo'
ports:
- "3306:3306"
$ docker compose up -d producer-db
# 参考までにバージョン情報を記載
$ docker -v
Docker version 27.3.1, build ce12230
JPAでMySQL接続するための依存ライブラリを追加します。
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.1")
implementation("org.springframework.boot:spring-boot-starter-data-jpa") // 追加
runtimeOnly ("com.mysql:mysql-connector-j") // 追加
}
application.yml にDB接続設定を追加します。
# 以下を追加
spring:
datasource:
url: jdbc:mysql://localhost:3306/trial
username: docker
password: docker
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
database: mysql
hibernate:
ddl-auto: update
これでアプリを起動すると、MySQLに以下のテーブルができています。
association_value_entry
association_value_entry_seq
dead_letter_entry
domain_event_entry
domain_event_entry_seq
saga_entry
snapshot_event_entry
token_entry
この状態で、改めてAPIにPOSTして動作確認します。
$ curl -X POST localhost:8080 \
> -H 'Content-Type: application/json' \
> -d '{"body":"test"}'
{"id":"cb7234ff-e787-40f7-acdf-163f7630b20f"}
すると、domain_event_entryテーブルに、発行したイベントのレコードが追加されています。
domain_event_entryテーブルには以下のカラムがあります。
global_index
event_identifier
meta_data
payload
payload_version
payload_type
time_stamp
aggregate_identifier
sequence_number
type
payload_type
にはイベントの型が入っています。
org.example.axonkafkatrial.producer.service.DocCreated
payload
にはXML形式でイベントデータが入っています。
<org.example.axonkafkatrial.producer.service.DocCreated>
<docId>cb7234ff-e787-40f7-acdf-163f7630b20f</docId>
<body>test</body>
</org.example.axonkafkatrial.producer.service.DocCreated>
これで、イベントが発行でき、永続化できていることが確認できました。
Kafkaへのイベント送信
次に、発行したイベントをKafkaに送信します。
Kafkaの起動
まずはdockerでKafkaを起動します。
kraftモードで1台構成のクラスターを起動し、ブラウザでKafkaの状態が確認できるようにkafka-uiも追加しました。
compose.ymlの設定については以下を参考にさせていただきました。
services:
# 以下を追加
kafka-0:
image: bitnami/kafka
hostname: kafka-0
ports:
- "9094:9094"
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-0:9092,EXTERNAL://localhost:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
networks:
- network0
kafka-ui-0:
container_name: kafka-ui-0
image: provectuslabs/kafka-ui:v0.7.2
depends_on:
- kafka-0
ports:
- "9080:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=kafka-0
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-0:9092
- AUTH_TYPE="LOGIN_FORM"
- SPRING_SECURITY_USER_NAME=admin
- SPRING_SECURITY_USER_PASSWORD=admin
networks:
- network0
networks:
network0:
$ docker compose up -d kafka-0 kafka-ui-0
ブラウザでlocalhost:9080にアクセスすると、kafka-uiのダッシュボードが表示され、Kafkaのクラスターが起動していることが確認できます。
アプリケーションからKafkaへの接続とイベント送信
AxonのKafka統合のための依存ライブラリを追加します。
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.1")
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
runtimeOnly ("com.mysql:mysql-connector-j")
implementation("org.axonframework.extensions.kafka:axon-kafka-spring-boot-starter:4.10.0") // 追加
}
ここでアプリを起動してAPIにPOSTすると、Kafkaに接続できず、以下のようなWARNログが出続けます。
INFO 9760 --- [nio-8080-exec-1] o.e.a.producer.service.TrialService : command accepted: DocCreate(docId=7306afc7-e465-4bda-b2ec-cd4d2faeb6b0, body=test)
INFO 9760 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = -1
auto.include.jmx.reporter = true
batch.size = 16384
bootstrap.servers = [localhost:9092]
〜省略〜
INFO 9760 --- [nio-8080-exec-1] o.a.k.c.t.i.KafkaMetricsCollector : initializing Kafka metrics collector
INFO 9760 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
INFO 9760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.7.1
INFO 9760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: e2494e6ffb89f828
INFO 9760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1731129441403
INFO 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node -1 disconnected.
WARN 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
WARN 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
INFO 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node -1 disconnected.
WARN 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
WARN 9760 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
ログのProducerConfigを見ると、デフォルトでは localhost:9092
に接続しようとしているようです。
dockerで起動したKafkaは、コンテナネットワーク外からの接続を9094ポートで公開しているので、 localhost:9094
で接続する必要があります。
application.yml にKafka接続情報を記述します。
axon:
axonserver:
enabled: false
# 以下を追加
kafka:
bootstrap-servers: localhost:9094
これで再チャレンジすると、Kafka接続は成功したようです。
INFO 10262 --- [nio-8080-exec-1] o.e.a.producer.service.TrialService : command accepted: DocCreate(docId=28e50b8d-0688-4105-a674-f3fd57edc4d6, body=test)
INFO 10262 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = -1
auto.include.jmx.reporter = true
batch.size = 16384
bootstrap.servers = [localhost:9094]
〜省略〜
INFO 10262 --- [nio-8080-exec-1] o.a.k.c.t.i.KafkaMetricsCollector : initializing Kafka metrics collector
INFO 10262 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
INFO 10262 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.7.1
INFO 10262 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: e2494e6ffb89f828
INFO 10262 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1731129716329
WARN 10262 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {Axon.Events=UNKNOWN_TOPIC_OR_PARTITION}
INFO 10262 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: 6ju05AkfTN-V06364F0G9g
INFO 10262 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
送信先のトピックを指定していない状態なので、イベントは、デフォルトのAxon.Events
トピックに送信されているようです。
特にAxon.Events
トピックを作成する操作はしていなかったので、トピックは存在しなかったはずですが、Kafkaの設定で auto.create.topics.enable
が true
になっているので、Kafkaがイベントを受信した際にトピックが自動作成されたようです。
Kafkaが受信したメッセージは以下のようになっています。
valueには、domain_event_entry
テーブルのpayload
カラムと同じ値が入っています。
<org.example.axonkafkatrial.producer.service.DocCreated>
<docId>cb7234ff-e787-40f7-acdf-163f7630b20f</docId>
<body>test</body>
</org.example.axonkafkatrial.producer.service.DocCreated>
headerには、以下のようなJSONが入っています。
{
"axon-message-id": "722ff952-e7c3-4d09-a960-fecf0aac0d95",
"axon-metadata-correlationId": "5c7a8e67-0b65-405c-8fa8-fc35203624ef",
"axon-message-revision": null,
"axon-message-timestamp": "\u0000\u0000\u0001�\u000f_-�",
"axon-message-type": "org.example.axonkafkatrial.producer.service.DocCreated",
"axon-metadata-traceId": "5c7a8e67-0b65-405c-8fa8-fc35203624ef"
}
domain_event_entry
テーブルのカラムの値との対応関係は以下になっていると思われます。
-
axon-message-id
→event_identifier
-
axon-metadata-correlationId
→meta_data
カラムのXML内のcorrelationId
-
axon-message-revision
→payload_revision
(nullなので合っているか確度低) -
axon-message-timestamp
→time_stamp
(文字化けしているので確度低) -
axon-message-type
→payload_type
-
axon-message-traceId
→meta_data
カラムのXML内のtraceId
おまけ
ちなみに、Axonで、CQRS/ESはやらず、Kafkaにイベントを送信するだけなら、Axon導入の最初に試したCommandGateway
やCommandHandler
を使う必要はありません。
例えば、以下のような書き方でも、イベント送信は可能です。
package org.example.axonkafkatrial.producer.controller
import org.axonframework.commandhandling.gateway.CommandGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.example.axonkafkatrial.producer.service.TrialService
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.*
import java.util.*
@RestController
@RequestMapping("/")
class TrialController(
// private val commandGateway: CommandGateway,
private val trialService: TrialService,
) {
@GetMapping
@ResponseStatus(HttpStatus.OK)
fun get(): GetResponse {
return GetResponse("success")
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
fun post(@RequestBody req: PostRequest): PostResponse {
val id = UUID.randomUUID()
val command = DocCreate(id, req.body)
// commandGateway.sendAndWait<DocCreate>(command)
trialService.run(command) // CommandGatewayを使用せずTrialServiceを直接呼ぶ
return PostResponse(id)
}
}
data class GetResponse(val body: String)
data class PostRequest(val body: String)
data class PostResponse(val id: UUID)
package org.example.axonkafkatrial.producer.service
import org.axonframework.commandhandling.CommandHandler
import org.axonframework.eventhandling.gateway.EventGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.util.UUID
@Service
class TrialService(private val eventGateway: EventGateway) {
private val logger = LoggerFactory.getLogger(this.javaClass)
// @CommandHandler
fun run(command: DocCreate) {
logger.info("command accepted: $command")
val event = DocCreated(command.docId, command.body)
eventGateway.publish(event)
}
}
data class DocCreated(
val docId: UUID,
val body: String,
)
まとめ
Axonを使用してKafkaにイベントを送信する最小限の実装を試しました。
引き続き、各種設定・カスタマイズの方法や、Kafkaからイベントを受信する方法についても確認していく予定です。