0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Axon FrameworkでKafkaからイベント受信する [Kotlin,SpringBoot]

Posted at

はじめに

Axon Framework(以降、Axonと記述)を使用してKafkaからイベントを受信する最小限の実装を試した過程を残しておきます。

Kotlinを使用し、Spring Bootアプリケーションとして構築しています。

なお、Kafkaにイベントを送信するProducerアプリケーションについては、以下の記事に記載しており、本記事での動作確認においても、そのProducerアプリケーションを使用してイベントを送信しています。

プロジェクトのセットアップ

本記事では、上記の記事に記載したProducerアプリケーションと同様にSpringBootとAxonの最低限のセットアップを行った状態を、初期状態とします。

動作確認をおこなった環境についても、上記の記事と同じなので、記載を割愛します。

consumer
├── build.gradle.kts
├── gradle
│   └── wrapper
│       ├── gradle-wrapper.jar
│       └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── settings.gradle.kts
└── src
    └── main
        ├── kotlin
        │   └── org
        │       └── example
        │           └── axonkafkatrial
        │               └── consumer
        │                   └── ConsumerApplication.kt
        └── resources
            └── application.yml
consumer/setting.gradle.kts
rootProject.name = "axon_kafka_trial_consumer"
consumer/build.gradle.kts
plugins {
    val kotlinVersion = "2.0.0"
    kotlin("jvm") version kotlinVersion
    kotlin("plugin.spring") version kotlinVersion
    id("org.springframework.boot") version "3.3.5"
    id("io.spring.dependency-management") version "1.1.5"
}

group = "org.example"
version = "1.0-SNAPSHOT"

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
	  implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.1")
    implementation("org.axonframework.extensions.kafka:axon-kafka-spring-boot-starter:4.10.0")
}

kotlin {
    jvmToolchain(21)
}

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/ConsumerApplication.kt
package org.example.axonkafkatrial.consumer

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class ConsumerApplication

fun main(args: Array<String>) {
    runApplication<ConsumerApplication>(*args)
}
consumer/src/main/resources/application.yml
axon:
  axonserver:
    enabled: false
  serializer:
    events: jackson
  kafka:
    bootstrap-servers: localhost:9094

なお、Producerアプリケーションでは、イベント送信の動作確認のためにAPIエンドポイントを用意していましたが、今回のConsumerアプリケーションでは、Kafkaからのイベント受信をトリガとして動作する処理を実装するので、APIエンドポイントは作成していません。

Kafkaからのイベント受信

イベント受信時の処理の記述

イベント受信時に動作する処理を作成していきます。
SpringBootによりDIされるように@Component等のアノテーションを付与したクラス内で、 @EventHandlerアノテーションを付与したメソッドを作成します。
メソッド名は任意で、メソッドの第一引数に処理したいイベントの型(以下だとDocCreated)を指定します。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/listener/ConsumerProcess.kt
package org.example.axonkafkatrial.consumer.listener

import org.axonframework.eventhandling.EventHandler
import org.example.axonkafkatrial.shared.event.DocCreated
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component

@Component
class ConsumerProcess {
    private val logger = LoggerFactory.getLogger(this.javaClass)

    @EventHandler
    fun on(event: DocCreated) {
        logger.info("event received: $event")
    }
}

このあたりの記述は、Kafkaからのイベント受信に固有のものではなくAxonの機能なので、例えば、Kafkaを使用せず、同一アプリケーション内で発行されたイベントをトリガーとして非同期に動作するメソッドを作成する場合でも、同じ書き方になります。

つまり、イベントの受信方法はAxonにより抽象化されているので、アプリケーションとしてはイベント受信時の処理内容を記述するだけでよいことになります。

イベント型をProducerと共有する別プロジェクトの用意

上述のとおり、ConsumerのEventHandlerメソッドは、処理したいイベントの型を指定して記述する必要があります。

Producerアプリケーション側で定義したイベント型を指定することも可能ですが、それではConsumerがProducerに直接依存することになり、Pub/Subにより実現したい疎結合を阻害してしまうとも考えられます。

今回は、ProducerとConsumerの間でイベント型を共有するための簡易スキーマレジストリとして、新たにSharedプロジェクト(モジュール)を用意したうえで、ProducerとConsumerのそれぞれからSharedを参照することにします。

shared/
├── build.gradle.kts
├── gradle
│   └── wrapper
│       ├── gradle-wrapper.jar
│       └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── settings.gradle.kts
└── src
    └── main
        └── kotlin
            └── org
                └── example
                    └── axonkafkatrial
                        └── shared
                            └── event
                                └── DocCreated.kt
shared/setting.gradle.kts
rootProject.name = "axon_kafka_trial_shared"
shared/build.gradle.kts
plugins {
    val kotlinVersion = "2.0.0"
    kotlin("jvm") version kotlinVersion
}

group = "org.example"
version = "1.0-SNAPSHOT"

repositories {
    mavenCentral()
}

dependencies {
}

kotlin {
    jvmToolchain(21)
}
shared/src/main/kotlin/org/example/axonkafkatrial/shared/event/DocCreated.kt
package org.example.axonkafkatrial.shared.event

import java.util.*

data class DocCreated(
    val docId: UUID,
    val body: String,
)

ConsumerプロジェクトにSharedプロジェクトへの参照を追加します。

consumer/setting.gradle.kts
rootProject.name = "axon_kafka_trial_consumer"

includeBuild("../shared") // 追加
consumer/build.gradle.kts
dependencies {
    implementation("org.example:axon_kafka_trial_shared") // 追加

    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
}

Producerプロジェクトも、Consumerと同様にSharedプロジェクトを参照したうえで、
SharedのDocCreatedイベントを使用するように変更します。

producer/src/main/kotlin/org/example/axonkafkatrial/producer/service/TrialService.kt
package org.example.axonkafkatrial.producer.service

import org.axonframework.commandhandling.CommandHandler
import org.axonframework.eventhandling.gateway.EventGateway
import org.example.axonkafkatrial.producer.command.DocCreate // 追加
import org.example.axonkafkatrial.producer.command.OtherCreate
import org.example.axonkafkatrial.shared.event.DocCreated
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.util.UUID

@Service
class TrialService(private val eventGateway: EventGateway) {
    private val logger = LoggerFactory.getLogger(this.javaClass)

    @CommandHandler
    fun run(command: DocCreate) {
        logger.info("command accepted: $command")
        val event = DocCreated(command.docId, command.body)
        eventGateway.publish(event)
    }

    @CommandHandler
    fun run(command: OtherCreate) {
        logger.info("command accepted: $command")
        val event = OtherCreated(command.id, command.body)
        eventGateway.publish(event)
    }
}

// 削除
//data class DocCreated(
//    val docId: UUID,
//    val body: String,
//)

data class OtherCreated(
    val id: UUID,
    val body: String,
)

なお、ここでは、ProducerがSharedで定義された型のイベントを直接生成して送信していますが、イベントの生成時はProducerアプリケーションの境界づけられたコンテキスト内で定義されたイベントを使用し、Kafka経由で別アプリケーション(別コンテキスト)に送信する際だけSharedのイベントに変換するようにすれば、より疎結合になると思います。

ただし、イベントの型が内部用と共有用の2つ必要になり相互変換も必要になる分の手間は増えるので、状況に応じて検討が必要な部分かと思います。

動作確認

ProducerApplicationとConsumerApplicationを同時に立ち上げて動作確認するため、ポート競合しないように起動ポートを変更します。

producer/src/main/resources/application.yml
server:
  port: 8080
consumer/src/main/resources/application.yml
server:
  port: 8081

ProducerApplicationとConsumerApplicationを両方起動した状態で、ProducerのAPIにPOSTします。

$ curl -X POST localhost:8080 \
>  -H 'Content-Type: application/json' \
>  -d '{"body":"test"}'
{"id":"da10b02e-7529-482a-a82f-f632d5b0e02f"}

これでKafkaへのイベント送信までは成功しますが、Consumer側はうんともすんとも言いません。
(起動時の下記ログ以降、特にログが出力されていません)

INFO 67071 --- [           main] o.e.a.consumer.ConsumerApplicationKt     : Started ConsumerApplicationKt in 2.021 seconds (process running for 2.356)

Kafka連携のための設定が不足しているようなので、ドキュメントの以下あたりを参考にapplication.ymlに設定を追加します。

consumer/src/main/resources/application.yml
axon:
  axonserver:
    enabled: false
  serializer:
    events: jackson
  kafka:
    bootstrap-servers: localhost:9094
    # 以下を追加
    consumer:
      event-processor-mode: tracking

また、ドキュメントの以下あたりを参考に、EventProcessorとして上記で作成した ConsumerProcessクラスのあるpackageを登録します。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaTrackingConfig.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.config.EventProcessingConfigurer
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Configuration

@Configuration
class AxonKafkaTrackingConfig {
    @Autowired
    fun configure(
        config: EventProcessingConfigurer,
        streamableKafkaMessageSource: StreamableKafkaMessageSource<String, ByteArray>,
    ) {
        config.registerTrackingEventProcessor("org.example.axonkafkatrial.consumer.listener") {
            streamableKafkaMessageSource
        }
    }
}

この状態でConsumerアプリケーションを再起動すると、Kafka接続が成功しているように見えます。

INFO 74724 --- [           main] o.a.s.a.XStreamAutoConfiguration         : Initializing an XStream instance since none was found. The auto configuration base packages will be used as wildcards for the XStream security settings.
INFO 74724 --- [           main] o.a.s.util.XStreamSecurityTypeUtility    : Constructing wildcard type for base package [org.example.axonkafkatrial.consumer].
INFO 74724 --- [           main] o.a.serialization.ChainingConverter      : ContentTypeConverter of type [class org.axonframework.serialization.xml.Dom4JToByteArrayConverter] is ignored. It seems to rely on a class that is not available in the class loader: org/dom4j/Document
INFO 74724 --- [           main] o.a.serialization.ChainingConverter      : ContentTypeConverter of type [class org.axonframework.serialization.xml.InputStreamToDom4jConverter] is ignored. It seems to rely on a class that is not available in the class loader: org/dom4j/Document
INFO 74724 --- [           main] o.a.serialization.ChainingConverter      : ContentTypeConverter of type [class org.axonframework.serialization.xml.XomToStringConverter] is ignored. It seems to rely on a class that is not available in the class loader: nu/xom/Document
INFO 74724 --- [           main] o.a.serialization.ChainingConverter      : ContentTypeConverter of type [class org.axonframework.serialization.xml.InputStreamToXomConverter] is ignored. It seems to rely on a class that is not available in the class loader: nu/xom/ParsingException
INFO 74724 --- [           main] o.a.config.AxonIQConsoleModule           : 
################################################################################################
## You have not configured AxonIQ Console. AxonIQ Console provides out-of-the box monitoring  ##
## and management capabilities for your Axon Application, starting with it is free.           ##
## Visit https://console.axoniq.io for more information!                                      ##
## Suppress this message by setting system property disable-axoniq-console-message to true.   ##
################################################################################################

WARN 74724 --- [           main] o.a.e.t.inmemory.InMemoryTokenStore      : An in memory token store is being created.
This means the event processor using this token store might process the same events again when the application is restarted.
If the use of an in memory token store is intentional, this warning can be ignored.
If the tokens should be persisted, use the JPA, JDBC or MongoDB token store instead.
INFO 74724 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port 8081 (http) with context path '/'
INFO 74724 --- [           main] o.e.a.consumer.ConsumerApplicationKt     : Started ConsumerApplicationKt in 2.642 seconds (process running for 3.265)
INFO 74724 --- [mer.listener]-0] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.include.jmx.reporter = true
	auto.offset.reset = latest
	bootstrap.servers = [localhost:9094]
  〜省略〜

INFO 74724 --- [mer.listener]-0] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
INFO 74724 --- [mer.listener]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.7.1
INFO 74724 --- [mer.listener]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: e2494e6ffb89f828
INFO 74724 --- [mer.listener]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1731208745918
INFO 74724 --- [mer.listener]-0] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-null-1, groupId=null] Cluster ID: 6ju05AkfTN-V06364F0G9g
INFO 74724 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Worker assigned to segment Segment[0/0] for processing
INFO 74724 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Using current Thread for last segment worker: TrackingSegmentWorker{processor=org.example.axonkafkatrial.consumer.listener, segment=Segment[0/0]}
INFO 74724 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Fetched token: ReplayToken{currentToken=null, tokenAtReset=KafkaTrackingToken{positions={Axon.Events-0=6}}, context=null} for segment: Segment[0/0]
INFO 74724 --- [mer.listener]-0] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.include.jmx.reporter = true
	auto.offset.reset = latest
	bootstrap.servers = [localhost:9094]
  〜省略〜〜

INFO 74724 --- [mer.listener]-0] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
INFO 74724 --- [mer.listener]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.7.1
INFO 74724 --- [mer.listener]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: e2494e6ffb89f828
INFO 74724 --- [mer.listener]-0] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1731208746278
INFO 74724 --- [mer.listener]-0] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-null-2, groupId=null] Cluster ID: 6ju05AkfTN-V06364F0G9g
INFO 74724 --- [mer.listener]-0] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-null-2, groupId=null] Assigned to partition(s): Axon.Events-0
INFO 74724 --- [mer.listener]-0] o.a.e.k.e.consumer.ConsumerSeekUtil      : Seeking topic-partition [Axon.Events-0] with offset [0]
INFO 74724 --- [mer.listener]-0] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-null-2, groupId=null] Seeking to offset 0 for partition Axon.Events-0

以下のログが出ているので、Axon.Eventsトピックからイベントを受信しようとしているように見えます。

INFO 74724 --- [mer.listener]-0] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-null-2, groupId=null] Assigned to partition(s): Axon.Events-0
INFO 74724 --- [mer.listener]-0] o.a.e.k.e.consumer.ConsumerSeekUtil      : Seeking topic-partition [Axon.Events-0] with offset [0]
INFO 74724 --- [mer.listener]-0] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-null-2, groupId=null] Seeking to offset 0 for partition Axon.Events-0

試しに、ProducerからAxon.Events トピックに DocCreated イベントを送信してみたところ、下記ログが出力され、Consumerがイベントを受信して処理していることが確認できました。

INFO 74724 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=255e534e-31b2-4570-b8c7-8c4e1b45072f, body=test)

受信するKafkaトピックの指定

doc-topic からイベントを受信できるように、application.ymlに default-topic の設定を追加します

consumer/src/main/resources/application.yml
axon:
  axonserver:
    enabled: false
  serializer:
    events: jackson
  kafka:
    bootstrap-servers: localhost:9094
    consumer:
      event-processor-mode: tracking
    default-topic: doc-topic # 追加

これで、doc-topicに溜まっていたイベント、および、Consumerアプリケーション起動後に新たに送信したイベント、を受信して処理することができました。

INFO 75971 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=cf1e8377-5e4a-4d54-8c67-fe24d1538b9e, body=test)
INFO 75971 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=c7eb810d-fdf3-456f-99cb-811407e5ff96, body=test)
INFO 75971 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=adb95e9e-7e16-4ce0-8b79-56ca1cb2a573, body=test)
INFO 75971 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=6a5525af-7d9f-4625-98ad-1ebbafacde57, body=test)

イベント処理状況の永続化

ここまでの状態でConsumerアプリケーションを再起動すると、既に処理済のイベントも含め、トピックに溜まっているイベントを全て再度処理してしまいます。

以下のログに出ているとおり、InMemoryTokenStoreが使用されているため、処理状況が永続化されていないようです。

WARN 76304 --- [           main] o.a.e.t.inmemory.InMemoryTokenStore      : An in memory token store is being created.
This means the event processor using this token store might process the same events again when the application is restarted.
If the use of an in memory token store is intentional, this warning can be ignored.
If the tokens should be persisted, use the JPA, JDBC or MongoDB token store instead.

そこで、Producerと同様に、Consumer用のMySQLを新たに起動して、JPAで接続するようにします。

compose.yml
services:
  producer-db:
    image: mysql:latest
    container_name: producer-db
    environment:
      MYSQL_ROOT_PASSWORD: mysql
      MYSQL_DATABASE: trial
      MYSQL_USER: docker
      MYSQL_PASSWORD: docker
      TZ: 'Asia/Tokyo'
    ports:
      - "3306:3306"

  # 追加
  consumer-db:
    image: mysql:latest
    container_name: consumer-db
    environment:
      MYSQL_ROOT_PASSWORD: mysql
      MYSQL_DATABASE: trial
      MYSQL_USER: docker
      MYSQL_PASSWORD: docker
      TZ: 'Asia/Tokyo'
    ports:
      - "3316:3306" # ポート重複しないように3316に変えている
$ docker compose up -d consumer-db
consumer/build.gradle.kts
dependencies {
    implementation("org.example:axon_kafka_trial_shared")

    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.axonframework:axon-spring-boot-starter:4.10.2")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.17.1")
    implementation("org.axonframework.extensions.kafka:axon-kafka-spring-boot-starter:4.10.0")
    // 以下を追加
    implementation("org.springframework.boot:spring-boot-starter-data-jpa")
    runtimeOnly ("com.mysql:mysql-connector-j")
}

consumer/src/main/resources/application.yml
# 以下を追加
spring:
  datasource:
    url: jdbc:mysql://localhost:3316/trial
    username: docker
    password: docker
    driver-class-name: com.mysql.cj.jdbc.Driver
  jpa:
    database: mysql
    hibernate:
      ddl-auto: update

ここでアプリ起動すると、以下のエラーが発生しました。

2024-11-10T12:46:13.713+09:00  WARN 77525 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Fetch Segments for Processor 'org.example.axonkafkatrial.consumer.listener' failed: No converter available
---- Debugging information ----
message             : No converter available
type                : java.util.Collections$UnmodifiableMap
converter           : com.thoughtworks.xstream.converters.reflection.ReflectionConverter
message[1]          : Unable to make field private static final long java.util.Collections$UnmodifiableMap.serialVersionUID accessible: module java.base does not "opens java.util" to unnamed module @40f08448
-------------------------------. Preparing for retry in 1s

com.thoughtworks.xstream.converters.ConversionException: No converter available
---- Debugging information ----
message             : No converter available
type                : java.util.Collections$UnmodifiableMap
converter           : com.thoughtworks.xstream.converters.reflection.ReflectionConverter
message[1]          : Unable to make field private static final long java.util.Collections$UnmodifiableMap.serialVersionUID accessible: module java.base does not "opens java.util" to unnamed module @40f08448
-------------------------------
	at com.thoughtworks.xstream.core.DefaultConverterLookup.lookupConverterForType(DefaultConverterLookup.java:88) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.XStream$1.lookupConverterForType(XStream.java:478) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.core.TreeMarshaller.convertAnother(TreeMarshaller.java:49) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.core.AbstractReferenceMarshaller$1.convertAnother(AbstractReferenceMarshaller.java:83) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.marshallField(AbstractReflectionConverter.java:270) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter$2.writeField(AbstractReflectionConverter.java:174) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.doMarshal(AbstractReflectionConverter.java:262) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.marshal(AbstractReflectionConverter.java:90) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.core.AbstractReferenceMarshaller.convert(AbstractReferenceMarshaller.java:68) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.core.TreeMarshaller.convertAnother(TreeMarshaller.java:59) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.core.AbstractReferenceMarshaller$1.convertAnother(AbstractReferenceMarshaller.java:83) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.marshallField(AbstractReflectionConverter.java:270) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter$2.writeField(AbstractReflectionConverter.java:174) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.doMarshal(AbstractReflectionConverter.java:262) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.converters.reflection.AbstractReflectionConverter.marshal(AbstractReflectionConverter.java:90) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.core.AbstractReferenceMarshaller.convert(AbstractReferenceMarshaller.java:68) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.core.TreeMarshaller.convertAnother(TreeMarshaller.java:59) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.core.TreeMarshaller.convertAnother(TreeMarshaller.java:44) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.core.TreeMarshaller.start(TreeMarshaller.java:83) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.core.AbstractTreeMarshallingStrategy.marshal(AbstractTreeMarshallingStrategy.java:37) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.XStream.marshal(XStream.java:1303) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.XStream.marshal(XStream.java:1292) ~[xstream-1.4.20.jar:1.4.20]
	at com.thoughtworks.xstream.XStream.toXML(XStream.java:1265) ~[xstream-1.4.20.jar:1.4.20]
	at org.axonframework.serialization.xml.XStreamSerializer.doSerialize(XStreamSerializer.java:130) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.serialization.AbstractXStreamSerializer.serialize(AbstractXStreamSerializer.java:110) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.tokenstore.jpa.TokenEntry.<init>(TokenEntry.java:85) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.tokenstore.jpa.JpaTokenStore.initializeTokenSegments(JpaTokenStore.java:124) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.lambda$run$1(TrackingEventProcessor.java:1278) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.common.transaction.TransactionManager.executeInTransaction(TransactionManager.java:47) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:1275) ~[axon-messaging-4.10.2.jar:4.10.2]
	at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

TokenEntryXStreamのConverterでserializeしようとしてエラーになっているように見えます。

application.ymlでは、Serializerはevents: jacksonを指定していますが、tokensのserializeについてはdefaultのSerializerが使われるので、そちらもjacksonにするよう設定が必要そうです。

consumer/src/main/resources/application.yml
axon:
  axonserver:
    enabled: false
  serializer:
    general: jackson # 追加
    events: jackson
  kafka:
    bootstrap-servers: localhost:9094
    consumer:
      event-processor-mode: tracking
    default-topic: doc-topic

Serializerについては、ドキュメントの以下あたりをご参照ください。

また、Producer側でSerializerを設定する際にドキュメントから確認した内容については、以下の記事にも記載しています。

この状態でアプリ起動すると、またイベントの受信ができるようになりました。

そして、DBのtoken_entryテーブルにレコードができており、tokenカラムに、トピックごとにどのオフセットまで処理したかが記録されています。

token-entry-created.png

token
{"positions":{"doc-topic-0":2}}

これで、アプリ再起動時に処理済のイベントが再度処理されることがなくなりました。

まとめ

Axonを使用してKafkaからイベントを受信する最小限の実装を試しました。

引き続き、各種設定・カスタマイズの方法についても確認していく予定です。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?