はじめに
Axon Framework(以降、Axonと記述)を使用してKafkaからイベントを受信する最小限の実装を試した過程を残しておきます。
Kotlinを使用し、Spring Bootアプリケーションとして構築しています。
なお、Kafkaにイベントを送信するProducerアプリケーションについては、以下の記事に記載しており、本記事での動作確認においても、そのProducerアプリケーションを使用してイベントを送信しています。
プロジェクトのセットアップ
本記事では、上記の記事に記載したProducerアプリケーションと同様にSpringBootとAxonの最低限のセットアップを行った状態を、初期状態とします。
動作確認をおこなった環境についても、上記の記事と同じなので、記載を割愛します。
consumer
├── build.gradle.kts
├── gradle
│ └── wrapper
│ ├── gradle-wrapper.jar
│ └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── settings.gradle.kts
└── src
└── main
├── kotlin
│ └── org
│ └── example
│ └── axonkafkatrial
│ └── consumer
│ └── ConsumerApplication.kt
└── resources
└── application.yml
rootProject.name = "axon_kafka_trial_consumer"
plugins {
val kotlinVersion = "2.0.0"
kotlin("jvm") version kotlinVersion
kotlin("plugin.spring") version kotlinVersion
id("org.springframework.boot") version "3.3.5"
id("io.spring.dependency-management") version "1.1.5"
}
group = "org.example"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.1")
implementation("org.axonframework.extensions.kafka:axon-kafka-spring-boot-starter:4.10.0")
}
kotlin {
jvmToolchain(21)
}
package org.example.axonkafkatrial.consumer
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
@SpringBootApplication
class ConsumerApplication
fun main(args: Array<String>) {
runApplication<ConsumerApplication>(*args)
}
axon:
axonserver:
enabled: false
serializer:
events: jackson
kafka:
bootstrap-servers: localhost:9094
なお、Producerアプリケーションでは、イベント送信の動作確認のためにAPIエンドポイントを用意していましたが、今回のConsumerアプリケーションでは、Kafkaからのイベント受信をトリガとして動作する処理を実装するので、APIエンドポイントは作成していません。
Kafkaからのイベント受信
イベント受信時の処理の記述
イベント受信時に動作する処理を作成していきます。
SpringBootによりDIされるように@Component
等のアノテーションを付与したクラス内で、 @EventHandler
アノテーションを付与したメソッドを作成します。
メソッド名は任意で、メソッドの第一引数に処理したいイベントの型(以下だとDocCreated
)を指定します。
package org.example.axonkafkatrial.consumer.listener
import org.axonframework.eventhandling.EventHandler
import org.example.axonkafkatrial.shared.event.DocCreated
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
@Component
class ConsumerProcess {
private val logger = LoggerFactory.getLogger(this.javaClass)
@EventHandler
fun on(event: DocCreated) {
logger.info("event received: $event")
}
}
このあたりの記述は、Kafkaからのイベント受信に固有のものではなくAxonの機能なので、例えば、Kafkaを使用せず、同一アプリケーション内で発行されたイベントをトリガーとして非同期に動作するメソッドを作成する場合でも、同じ書き方になります。
つまり、イベントの受信方法はAxonにより抽象化されているので、アプリケーションとしてはイベント受信時の処理内容を記述するだけでよいことになります。
イベント型をProducerと共有する別プロジェクトの用意
上述のとおり、ConsumerのEventHandler
メソッドは、処理したいイベントの型を指定して記述する必要があります。
Producerアプリケーション側で定義したイベント型を指定することも可能ですが、それではConsumerがProducerに直接依存することになり、Pub/Subにより実現したい疎結合を阻害してしまうとも考えられます。
今回は、ProducerとConsumerの間でイベント型を共有するための簡易スキーマレジストリとして、新たにSharedプロジェクト(モジュール)を用意したうえで、ProducerとConsumerのそれぞれからSharedを参照することにします。
shared/
├── build.gradle.kts
├── gradle
│ └── wrapper
│ ├── gradle-wrapper.jar
│ └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── settings.gradle.kts
└── src
└── main
└── kotlin
└── org
└── example
└── axonkafkatrial
└── shared
└── event
└── DocCreated.kt
rootProject.name = "axon_kafka_trial_shared"
plugins {
val kotlinVersion = "2.0.0"
kotlin("jvm") version kotlinVersion
}
group = "org.example"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
}
kotlin {
jvmToolchain(21)
}
package org.example.axonkafkatrial.shared.event
import java.util.*
data class DocCreated(
val docId: UUID,
val body: String,
)
ConsumerプロジェクトにSharedプロジェクトへの参照を追加します。
rootProject.name = "axon_kafka_trial_consumer"
includeBuild("../shared") // 追加
dependencies {
implementation("org.example:axon_kafka_trial_shared") // 追加
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
}
Producerプロジェクトも、Consumerと同様にSharedプロジェクトを参照したうえで、
SharedのDocCreated
イベントを使用するように変更します。
package org.example.axonkafkatrial.producer.service
import org.axonframework.commandhandling.CommandHandler
import org.axonframework.eventhandling.gateway.EventGateway
import org.example.axonkafkatrial.producer.command.DocCreate // 追加
import org.example.axonkafkatrial.producer.command.OtherCreate
import org.example.axonkafkatrial.shared.event.DocCreated
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.util.UUID
@Service
class TrialService(private val eventGateway: EventGateway) {
private val logger = LoggerFactory.getLogger(this.javaClass)
@CommandHandler
fun run(command: DocCreate) {
logger.info("command accepted: $command")
val event = DocCreated(command.docId, command.body)
eventGateway.publish(event)
}
@CommandHandler
fun run(command: OtherCreate) {
logger.info("command accepted: $command")
val event = OtherCreated(command.id, command.body)
eventGateway.publish(event)
}
}
// 削除
//data class DocCreated(
// val docId: UUID,
// val body: String,
//)
data class OtherCreated(
val id: UUID,
val body: String,
)
なお、ここでは、ProducerがSharedで定義された型のイベントを直接生成して送信していますが、イベントの生成時はProducerアプリケーションの境界づけられたコンテキスト内で定義されたイベントを使用し、Kafka経由で別アプリケーション(別コンテキスト)に送信する際だけSharedのイベントに変換するようにすれば、より疎結合になると思います。
ただし、イベントの型が内部用と共有用の2つ必要になり相互変換も必要になる分の手間は増えるので、状況に応じて検討が必要な部分かと思います。
動作確認
ProducerApplicationとConsumerApplicationを同時に立ち上げて動作確認するため、ポート競合しないように起動ポートを変更します。
server:
port: 8080
server:
port: 8081
ProducerApplicationとConsumerApplicationを両方起動した状態で、ProducerのAPIにPOSTします。
$ curl -X POST localhost:8080 \
> -H 'Content-Type: application/json' \
> -d '{"body":"test"}'
{"id":"da10b02e-7529-482a-a82f-f632d5b0e02f"}
これでKafkaへのイベント送信までは成功しますが、Consumer側はうんともすんとも言いません。
(起動時の下記ログ以降、特にログが出力されていません)
INFO 67071 --- [ main] o.e.a.consumer.ConsumerApplicationKt : Started ConsumerApplicationKt in 2.021 seconds (process running for 2.356)
Kafka連携のための設定が不足しているようなので、ドキュメントの以下あたりを参考にapplication.ymlに設定を追加します。
axon:
axonserver:
enabled: false
serializer:
events: jackson
kafka:
bootstrap-servers: localhost:9094
# 以下を追加
consumer:
event-processor-mode: tracking
また、ドキュメントの以下あたりを参考に、EventProcessorとして上記で作成した ConsumerProcess
クラスのあるpackageを登録します。
package org.example.axonkafkatrial.consumer.config
import org.axonframework.config.EventProcessingConfigurer
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Configuration
@Configuration
class AxonKafkaTrackingConfig {
@Autowired
fun configure(
config: EventProcessingConfigurer,
streamableKafkaMessageSource: StreamableKafkaMessageSource<String, ByteArray>,
) {
config.registerTrackingEventProcessor("org.example.axonkafkatrial.consumer.listener") {
streamableKafkaMessageSource
}
}
}
この状態でConsumerアプリケーションを再起動すると、Kafka接続が成功しているように見えます。
INFO 74724 --- [ main] o.a.s.a.XStreamAutoConfiguration : Initializing an XStream instance since none was found. The auto configuration base packages will be used as wildcards for the XStream security settings.
INFO 74724 --- [ main] o.a.s.util.XStreamSecurityTypeUtility : Constructing wildcard type for base package [org.example.axonkafkatrial.consumer].
INFO 74724 --- [ main] o.a.serialization.ChainingConverter : ContentTypeConverter of type [class org.axonframework.serialization.xml.Dom4JToByteArrayConverter] is ignored. It seems to rely on a class that is not available in the class loader: org/dom4j/Document
INFO 74724 --- [ main] o.a.serialization.ChainingConverter : ContentTypeConverter of type [class org.axonframework.serialization.xml.InputStreamToDom4jConverter] is ignored. It seems to rely on a class that is not available in the class loader: org/dom4j/Document
INFO 74724 --- [ main] o.a.serialization.ChainingConverter : ContentTypeConverter of type [class org.axonframework.serialization.xml.XomToStringConverter] is ignored. It seems to rely on a class that is not available in the class loader: nu/xom/Document
INFO 74724 --- [ main] o.a.serialization.ChainingConverter : ContentTypeConverter of type [class org.axonframework.serialization.xml.InputStreamToXomConverter] is ignored. It seems to rely on a class that is not available in the class loader: nu/xom/ParsingException
INFO 74724 --- [ main] o.a.config.AxonIQConsoleModule :
################################################################################################
## You have not configured AxonIQ Console. AxonIQ Console provides out-of-the box monitoring ##
## and management capabilities for your Axon Application, starting with it is free. ##
## Visit https://console.axoniq.io for more information! ##
## Suppress this message by setting system property disable-axoniq-console-message to true. ##
################################################################################################
WARN 74724 --- [ main] o.a.e.t.inmemory.InMemoryTokenStore : An in memory token store is being created.
This means the event processor using this token store might process the same events again when the application is restarted.
If the use of an in memory token store is intentional, this warning can be ignored.
If the tokens should be persisted, use the JPA, JDBC or MongoDB token store instead.
INFO 74724 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8081 (http) with context path '/'
INFO 74724 --- [ main] o.e.a.consumer.ConsumerApplicationKt : Started ConsumerApplicationKt in 2.642 seconds (process running for 3.265)
INFO 74724 --- [mer.listener]-0] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = latest
bootstrap.servers = [localhost:9094]
〜省略〜
INFO 74724 --- [mer.listener]-0] o.a.k.c.t.i.KafkaMetricsCollector : initializing Kafka metrics collector
INFO 74724 --- [mer.listener]-0] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.7.1
INFO 74724 --- [mer.listener]-0] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: e2494e6ffb89f828
INFO 74724 --- [mer.listener]-0] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1731208745918
INFO 74724 --- [mer.listener]-0] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-null-1, groupId=null] Cluster ID: 6ju05AkfTN-V06364F0G9g
INFO 74724 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Worker assigned to segment Segment[0/0] for processing
INFO 74724 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Using current Thread for last segment worker: TrackingSegmentWorker{processor=org.example.axonkafkatrial.consumer.listener, segment=Segment[0/0]}
INFO 74724 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Fetched token: ReplayToken{currentToken=null, tokenAtReset=KafkaTrackingToken{positions={Axon.Events-0=6}}, context=null} for segment: Segment[0/0]
INFO 74724 --- [mer.listener]-0] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = latest
bootstrap.servers = [localhost:9094]
〜省略〜〜
INFO 74724 --- [mer.listener]-0] o.a.k.c.t.i.KafkaMetricsCollector : initializing Kafka metrics collector
INFO 74724 --- [mer.listener]-0] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.7.1
INFO 74724 --- [mer.listener]-0] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: e2494e6ffb89f828
INFO 74724 --- [mer.listener]-0] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1731208746278
INFO 74724 --- [mer.listener]-0] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-null-2, groupId=null] Cluster ID: 6ju05AkfTN-V06364F0G9g
INFO 74724 --- [mer.listener]-0] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-null-2, groupId=null] Assigned to partition(s): Axon.Events-0
INFO 74724 --- [mer.listener]-0] o.a.e.k.e.consumer.ConsumerSeekUtil : Seeking topic-partition [Axon.Events-0] with offset [0]
INFO 74724 --- [mer.listener]-0] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-null-2, groupId=null] Seeking to offset 0 for partition Axon.Events-0
以下のログが出ているので、Axon.Events
トピックからイベントを受信しようとしているように見えます。
INFO 74724 --- [mer.listener]-0] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-null-2, groupId=null] Assigned to partition(s): Axon.Events-0
INFO 74724 --- [mer.listener]-0] o.a.e.k.e.consumer.ConsumerSeekUtil : Seeking topic-partition [Axon.Events-0] with offset [0]
INFO 74724 --- [mer.listener]-0] o.a.k.c.c.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-null-2, groupId=null] Seeking to offset 0 for partition Axon.Events-0
試しに、ProducerからAxon.Events
トピックに DocCreated
イベントを送信してみたところ、下記ログが出力され、Consumerがイベントを受信して処理していることが確認できました。
INFO 74724 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=255e534e-31b2-4570-b8c7-8c4e1b45072f, body=test)
受信するKafkaトピックの指定
doc-topic
からイベントを受信できるように、application.ymlに default-topic
の設定を追加します
axon:
axonserver:
enabled: false
serializer:
events: jackson
kafka:
bootstrap-servers: localhost:9094
consumer:
event-processor-mode: tracking
default-topic: doc-topic # 追加
これで、doc-topic
に溜まっていたイベント、および、Consumerアプリケーション起動後に新たに送信したイベント、を受信して処理することができました。
INFO 75971 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=cf1e8377-5e4a-4d54-8c67-fe24d1538b9e, body=test)
INFO 75971 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=c7eb810d-fdf3-456f-99cb-811407e5ff96, body=test)
INFO 75971 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=adb95e9e-7e16-4ce0-8b79-56ca1cb2a573, body=test)
INFO 75971 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=6a5525af-7d9f-4625-98ad-1ebbafacde57, body=test)
イベント処理状況の永続化
ここまでの状態でConsumerアプリケーションを再起動すると、既に処理済のイベントも含め、トピックに溜まっているイベントを全て再度処理してしまいます。
以下のログに出ているとおり、InMemoryTokenStore
が使用されているため、処理状況が永続化されていないようです。
WARN 76304 --- [ main] o.a.e.t.inmemory.InMemoryTokenStore : An in memory token store is being created.
This means the event processor using this token store might process the same events again when the application is restarted.
If the use of an in memory token store is intentional, this warning can be ignored.
If the tokens should be persisted, use the JPA, JDBC or MongoDB token store instead.
そこで、Producerと同様に、Consumer用のMySQLを新たに起動して、JPAで接続するようにします。
services:
producer-db:
image: mysql:latest
container_name: producer-db
environment:
MYSQL_ROOT_PASSWORD: mysql
MYSQL_DATABASE: trial
MYSQL_USER: docker
MYSQL_PASSWORD: docker
TZ: 'Asia/Tokyo'
ports:
- "3306:3306"
# 追加
consumer-db:
image: mysql:latest
container_name: consumer-db
environment:
MYSQL_ROOT_PASSWORD: mysql
MYSQL_DATABASE: trial
MYSQL_USER: docker
MYSQL_PASSWORD: docker
TZ: 'Asia/Tokyo'
ports:
- "3316:3306" # ポート重複しないように3316に変えている
$ docker compose up -d consumer-db
dependencies {
implementation("org.example:axon_kafka_trial_shared")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.1")
implementation("org.axonframework.extensions.kafka:axon-kafka-spring-boot-starter:4.10.0")
// 以下を追加
implementation("org.springframework.boot:spring-boot-starter-data-jpa")
runtimeOnly ("com.mysql:mysql-connector-j")
}
# 以下を追加
spring:
datasource:
url: jdbc:mysql://localhost:3316/trial
username: docker
password: docker
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
database: mysql
hibernate:
ddl-auto: update
ここでアプリ起動すると、以下のエラーが発生しました。
2024-11-10T12:46:13.713+09:00 WARN 77525 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Fetch Segments for Processor 'org.example.axonkafkatrial.consumer.listener' failed: No converter available
---- Debugging information ----
message : No converter available
type : java.util.Collections$UnmodifiableMap
converter : com.thoughtworks.xstream.converters.reflection.ReflectionConverter
message[1] : Unable to make field private static final long java.util.Collections$UnmodifiableMap.serialVersionUID accessible: module java.base does not "opens java.util" to unnamed module @40f08448
-------------------------------. Preparing for retry in 1s
com.thoughtworks.xstream.converters.ConversionException: No converter available
---- Debugging information ----
message : No converter available
type : java.util.Collections$UnmodifiableMap
converter : com.thoughtworks.xstream.converters.reflection.ReflectionConverter
message[1] : Unable to make field private static final long java.util.Collections$UnmodifiableMap.serialVersionUID accessible: module java.base does not "opens java.util" to unnamed module @40f08448
-------------------------------
at com.thoughtworks.xstream.core.DefaultConverterLookup.lookupConverterForType(DefaultConverterLookup.java:88) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.XStream$1.lookupConverterForType(XStream.java:478) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.core.TreeMarshaller.convertAnother(TreeMarshaller.java:49) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.core.AbstractReferenceMarshaller$1.convertAnother(AbstractReferenceMarshaller.java:83) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.marshallField(AbstractReflectionConverter.java:270) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter$2.writeField(AbstractReflectionConverter.java:174) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.doMarshal(AbstractReflectionConverter.java:262) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.marshal(AbstractReflectionConverter.java:90) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.core.AbstractReferenceMarshaller.convert(AbstractReferenceMarshaller.java:68) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.core.TreeMarshaller.convertAnother(TreeMarshaller.java:59) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.core.AbstractReferenceMarshaller$1.convertAnother(AbstractReferenceMarshaller.java:83) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.marshallField(AbstractReflectionConverter.java:270) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter$2.writeField(AbstractReflectionConverter.java:174) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.doMarshal(AbstractReflectionConverter.java:262) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.marshal(AbstractReflectionConverter.java:90) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.core.AbstractReferenceMarshaller.convert(AbstractReferenceMarshaller.java:68) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.core.TreeMarshaller.convertAnother(TreeMarshaller.java:59) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.core.TreeMarshaller.convertAnother(TreeMarshaller.java:44) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.core.TreeMarshaller.start(TreeMarshaller.java:83) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.core.AbstractTreeMarshallingStrategy.marshal(AbstractTreeMarshallingStrategy.java:37) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.XStream.marshal(XStream.java:1303) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.XStream.marshal(XStream.java:1292) ~[xstream-1.4.20.jar:1.4.20]
at com.thoughtworks.xstream.XStream.toXML(XStream.java:1265) ~[xstream-1.4.20.jar:1.4.20]
at org.axonframework.serialization.xml.XStreamSerializer.doSerialize(XStreamSerializer.java:130) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.serialization.AbstractXStreamSerializer.serialize(AbstractXStreamSerializer.java:110) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.tokenstore.jpa.TokenEntry.<init>(TokenEntry.java:85) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore.initializeTokenSegments(JpaTokenStore.java:124) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.lambda$run$1(TrackingEventProcessor.java:1278) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.common.transaction.TransactionManager.executeInTransaction(TransactionManager.java:47) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:1275) ~[axon-messaging-4.10.2.jar:4.10.2]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
TokenEntry
をXStream
のConverterでserializeしようとしてエラーになっているように見えます。
application.ymlでは、Serializerはevents: jackson
を指定していますが、tokens
のserializeについてはdefaultのSerializerが使われるので、そちらもjacksonにするよう設定が必要そうです。
axon:
axonserver:
enabled: false
serializer:
general: jackson # 追加
events: jackson
kafka:
bootstrap-servers: localhost:9094
consumer:
event-processor-mode: tracking
default-topic: doc-topic
Serializerについては、ドキュメントの以下あたりをご参照ください。
また、Producer側でSerializerを設定する際にドキュメントから確認した内容については、以下の記事にも記載しています。
この状態でアプリ起動すると、またイベントの受信ができるようになりました。
そして、DBのtoken_entry
テーブルにレコードができており、token
カラムに、トピックごとにどのオフセットまで処理したかが記録されています。
{"positions":{"doc-topic-0":2}}
これで、アプリ再起動時に処理済のイベントが再度処理されることがなくなりました。
まとめ
Axonを使用してKafkaからイベントを受信する最小限の実装を試しました。
引き続き、各種設定・カスタマイズの方法についても確認していく予定です。