0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Axon FrameworkでKafkaからイベントを受信するConsumerのエラーハンドリング(trackingモードの場合)[Kotlin,SpringBoot]

Posted at

はじめに

Axon Framework(以降、Axonと記述)を使用してKafkaからイベントを受信する実装を試しています。
これまでに、以下の記事に記載したとおり、基本的な実装を行なってきました。

Axonでは、Kafkaからイベントを受信すると、@EventHandlerアノテーションを付与したメソッドが実行されます。
本記事では、@EventHandlerアノテーションを付与したメソッド内で例外が発生した場合のエラーハンドリングとして、Axonの機能でどのようなことができるかを確認しました。

なお、エラーハンドリングにおける挙動は、Axonのconsumer.event-processor-modeの設定によって異なると思われます。本記事ではconsumer.event-processor-modetrackingの場合の挙動について確認しています。
consumer.event-processor-modeの設定による正常時の挙動の違いについては、以下の記事で確認しているのでご参照ください。

また、作成したコードの全体は以下のリポジトリをご参照ください。

デフォルトの挙動

以下のように、EventHandler処理内で1回目だけエラーを発生させるようにしてみます。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/listener/ConsumerProcess.kt
package org.example.axonkafkatrial.consumer.listener

import org.axonframework.eventhandling.EventHandler
import org.example.axonkafkatrial.shared.event.DocCreated
import org.example.axonkafkatrial.shared.event.OtherCreated
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component

@Component
class ConsumerProcess {
    private val logger = LoggerFactory.getLogger(this.javaClass)

    @EventHandler
    fun on(event: DocCreated) {
        sometimesThrowException() // これを追加
        logger.info("event received: $event")
    }

    @EventHandler
    fun on(event: OtherCreated) {
        logger.info("event received: $event")
    }

    private var count = 0
    private fun sometimesThrowException() {
        count++
        logger.info("count: $count")
        if ( count == 1) {
            throw RuntimeException("test exception")
        }
    }
}

この状態で、Producerアプリケーションを利用して、複数回イベントを送信します。

$ curl -X POST localhost:8080 -H 'Content-Type: application/json'  -d '{"body":"test"}'
{"id":"2e09e07b-0adb-49df-a6aa-6083cbc1288f"}

$ curl -X POST localhost:8080 -H 'Content-Type: application/json'  -d '{"body":"test"}'
{"id":"a4c35087-ee4a-4888-b200-1b8a1862088e"}

$ curl -X POST localhost:8080 -H 'Content-Type: application/json'  -d '{"body":"test"}'
{"id":"2dc381a2-a71a-4a2b-b893-100ec162250d"}

すると、期待通り、1つ目のイベント受信時の処理で例外が発生しました。
2つ目と3つ目のイベントは正常に処理されて、event received: ~というログが出力されています。

INFO  92920 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : count: 1
ERROR 92920 --- [mer.listener]-0] o.a.eventhandling.LoggingErrorHandler    : EventListener [ConsumerProcess] failed to handle event [3af44507-ac22-4f80-925e-1c21dff96ef1] (org.example.axonkafkatrial.shared.event.DocCreated). Continuing processing with next listener

java.lang.RuntimeException: test exception
	at org.example.axonkafkatrial.consumer.listener.ConsumerProcess.sometimesThrowException(ConsumerProcess.kt:29) ~[main/:na]
	at org.example.axonkafkatrial.consumer.listener.ConsumerProcess.on(ConsumerProcess.kt:15) ~[main/:na]
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
	at org.axonframework.messaging.annotation.AnnotatedMessageHandlingMember.handle(AnnotatedMessageHandlingMember.java:153) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.messaging.annotation.WrappedMessageHandlingMember.handle(WrappedMessageHandlingMember.java:64) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.tracing.TracingHandlerEnhancerDefinition$1.lambda$handle$1(TracingHandlerEnhancerDefinition.java:84) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.tracing.TracingHandlerEnhancerDefinition$1.handle(TracingHandlerEnhancerDefinition.java:84) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.messaging.annotation.NoMoreInterceptors.handle(NoMoreInterceptors.java:46) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.AnnotationEventHandlerAdapter.handle(AnnotationEventHandlerAdapter.java:100) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.SimpleEventHandlerInvoker.invokeHandlers(SimpleEventHandlerInvoker.java:128) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.SimpleEventHandlerInvoker.handle(SimpleEventHandlerInvoker.java:114) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.MultiEventHandlerInvoker.handle(MultiEventHandlerInvoker.java:91) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.AbstractEventProcessor.processMessageInUnitOfWork(AbstractEventProcessor.java:195) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$1(AbstractEventProcessor.java:173) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:57) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.messaging.interceptors.CorrelationDataInterceptor.handle(CorrelationDataInterceptor.java:67) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.TrackingEventProcessor.lambda$new$1(TrackingEventProcessor.java:181) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$2(AbstractEventProcessor.java:174) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$3(AbstractEventProcessor.java:170) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:92) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.AbstractEventProcessor.lambda$processInUnitOfWork$4(AbstractEventProcessor.java:166) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:165) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:491) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:316) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:1197) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.cleanUp(TrackingEventProcessor.java:1396) ~[axon-messaging-4.10.2.jar:4.10.2]
	at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:1373) ~[axon-messaging-4.10.2.jar:4.10.2]
	at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

INFO 92920 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : count: 2
INFO 92920 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=a4c35087-ee4a-4888-b200-1b8a1862088e, body=test)
INFO 92920 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : count: 3
INFO 92920 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=2dc381a2-a71a-4a2b-b893-100ec162250d, body=test)

ここで気になるのは、1つ目のイベント(docId=2e09e07b-0adb-49df-a6aa-6083cbc1288f)が例外発生してエラーになった後、特にリトライされていない(event received: 〜というログが出力されていない)ように見えることです。

そこで、ドキュメントを見てみます。

In the case of a Streaming Event Processor, this means the processor will go into error mode, releasing any tokens and retrying at an incremental interval (starting at 1 second, up to max 60 seconds).

この記述だけ見たときは、デフォルトで自動リトライされるのかと思ったのですが、上記のようにログを見る限りはリトライされている様子がありませんでした。
ただ、改めてよく読むと、以下の記述もありました。

The component dealing with exceptions thrown from an event handling method is called the ListenerInvocationErrorHandler. By default, these exceptions are logged (with the LoggingErrorHandler implementation), and processing continues with the next handler or message.

デフォルトではログだけ出して次のイベント(message)を処理するようです。

エラーハンドリングのカスタマイズ

ドキュメントには以下の記述があり、エラーハンドリングはカスタマイズできるようです。

The default ListenerInvocationErrorHandler used by each processing group can be customized. Furthermore, we can configure the error handling behavior per processing group:

ListenerInvocationErrorHandlerのinterfaceを実装したクラスを作成して登録すればよさそうです。
そこで、動作確認用に、ログだけ吐いてthrowしなおすHandlerクラスを作成して登録してみます。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/RethrowErrorHandler.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.eventhandling.EventMessage
import org.axonframework.eventhandling.EventMessageHandler
import org.axonframework.eventhandling.ListenerInvocationErrorHandler
import org.slf4j.LoggerFactory
import java.lang.Exception

class RethrowErrorHandler: ListenerInvocationErrorHandler {

    private val logger = LoggerFactory.getLogger(this.javaClass)

    override fun onError(exception: Exception, event: EventMessage<*>, eventHandler: EventMessageHandler) {
        logger.error("Rethrow event handling error: $event", exception)
        throw exception
    }
}

このRethrowErrorHandlerを以下のようにConfigで登録します。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.kt
@Configuration
class AxonKafkaConsumerConfig {
    // 〜省略〜

    // 追加
    @Bean
    fun processingGroupErrorHandlingConfigurerModule(): ConfigurerModule =
        ConfigurerModule { configurer: Configurer ->
            configurer.eventProcessing { processingConfigurer: EventProcessingConfigurer ->
                processingConfigurer.registerDefaultListenerInvocationErrorHandler {
                    RethrowErrorHandler()
                }
            }
        }
}

この状態で、アプリケーションを再起動したうえで、再度イベントを送信して、例外を発生させます。

$ curl -X POST localhost:8080 -H 'Content-Type: application/json'  -d '{"body":"test"}'
{"id":"ccc320b9-d199-4b5b-b1aa-d3e75138127f"}

すると、例外発生してエラーになったイベントがリトライで成功するようになりました。

INFO  95538 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : count: 1
ERROR 95538 --- [mer.listener]-0] o.e.a.c.config.RethrowErrorHandler       : Rethrow event handling error: GenericTrackedEventMessage{payload={DocCreated(docId=ccc320b9-d199-4b5b-b1aa-d3e75138127f, body=test)}, metadata={'traceId'->'02eb9d61-0931-4bc1-afb5-9b0e149e23e5', 'correlationId'->'02eb9d61-0931-4bc1-afb5-9b0e149e23e5'}, messageIdentifier='706cf31e-f098-45d9-8389-70029df1aece', timestamp='2024-12-14T01:47:01.662Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=6}}}}

java.lang.RuntimeException: test exception
	at org.example.axonkafkatrial.consumer.listener.ConsumerProcess.sometimesThrowException(ConsumerProcess.kt:29) ~[main/:na]
	〜省略〜
	at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

WARN 95538 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Error occurred. Starting retry mode.

java.lang.RuntimeException: test exception
	at org.example.axonkafkatrial.consumer.listener.ConsumerProcess.sometimesThrowException(ConsumerProcess.kt:29) ~[main/:na]
	〜省略〜
	at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

WARN 95538 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 1s
INFO 95538 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Released claim
INFO 95538 --- [mer.listener]-0] o.a.e.k.e.consumer.FetchEventsTask       : Closing down FetchEventsTask using Consumer [org.apache.kafka.clients.consumer.KafkaConsumer@1d21c89c]
〜省略〜
INFO 95538 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : count: 2
INFO 95538 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=ccc320b9-d199-4b5b-b1aa-d3e75138127f, body=test)

ログを見ると、1回目(count: 1)の処理で例外発生してエラーとなった後、retry modeが始まり、最初のリトライ時(count: 2)で処理が成功したことがわかります。

次に、どこまでリトライされるのかを確認するため、1回目だけでなく常に例外発生するようにしてみます。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/listener/ConsumerProcess.kt
    private var count = 0
    private fun sometimesThrowException() {
        count++
        logger.info("count: $count")
//        if ( count == 1) {
            throw RuntimeException("test exception")
//        }
    }

すると、Exponential Backoffでリトライし続けました。
ログを見ると、リトライ間隔が1s、2s、4sのように増えていっていることがわかります。

INFO  96419 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : count: 1
ERROR 96419 --- [mer.listener]-0] o.e.a.c.config.RethrowErrorHandler       : Rethrow event handling error: GenericTrackedEventMessage{payload={DocCreated(docId=e34e936e-10fd-45ca-af3e-b17875df8f84, body=test)}, metadata={'traceId'->'f2eb3f73-4c5c-47b4-95c5-ba9a128b799b', 'correlationId'->'f2eb3f73-4c5c-47b4-95c5-ba9a128b799b'}, messageIdentifier='7d5f0865-f4e4-469e-beac-26483c90309b', timestamp='2024-12-14T01:55:51.887Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=7}}}}

java.lang.RuntimeException: test exception
	〜省略〜

WARN  96419 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Error occurred. Starting retry mode.

java.lang.RuntimeException: test exception
	〜省略〜

WARN  96419 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 1s
〜省略〜
INFO  96419 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : count: 2
ERROR 96419 --- [mer.listener]-0] o.e.a.c.config.RethrowErrorHandler       : Rethrow event handling error: GenericTrackedEventMessage{payload={DocCreated(docId=e34e936e-10fd-45ca-af3e-b17875df8f84, body=test)}, metadata={'traceId'->'f2eb3f73-4c5c-47b4-95c5-ba9a128b799b', 'correlationId'->'f2eb3f73-4c5c-47b4-95c5-ba9a128b799b'}, messageIdentifier='7d5f0865-f4e4-469e-beac-26483c90309b', timestamp='2024-12-14T01:55:51.887Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=7}}}}

java.lang.RuntimeException: test exception
	〜省略〜

WARN  96419 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 2s
〜省略〜
INFO  96419 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : count: 3
ERROR 96419 --- [mer.listener]-0] o.e.a.c.config.RethrowErrorHandler       : Rethrow event handling error: GenericTrackedEventMessage{payload={DocCreated(docId=e34e936e-10fd-45ca-af3e-b17875df8f84, body=test)}, metadata={'traceId'->'f2eb3f73-4c5c-47b4-95c5-ba9a128b799b', 'correlationId'->'f2eb3f73-4c5c-47b4-95c5-ba9a128b799b'}, messageIdentifier='7d5f0865-f4e4-469e-beac-26483c90309b', timestamp='2024-12-14T01:55:51.887Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=7}}}}

java.lang.RuntimeException: test exception
	〜省略〜

WARN  96419 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Releasing claim on token and preparing for retry in 4s

デッドレターキュー(DLQ)の利用

上記のとおりリトライされるようにしたうえで、今度は1つ目のイベントおよびそのリトライでは例外発生するが、2つ目以降のイベントでは成功するようにします。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/listener/ConsumerProcess.kt
@Component
class ConsumerProcess {
    private val logger = LoggerFactory.getLogger(this.javaClass)

    @EventHandler
    fun on(event: DocCreated) {
        sometimesThrowException(event)
        logger.info("event received: $event")
    }

    @EventHandler
    fun on(event: OtherCreated) {
        logger.info("event received: $event")
    }

    // 1つ目のイベントだけ例外発生させる
    private var count = 0
    private var eventToFail: DocCreated? = null
    private fun sometimesThrowException(event: DocCreated) {
        count++
        logger.info("count: $count")
        if ( count == 1 || eventToFail == event) {
            eventToFail = event
            throw RuntimeException("test exception")
        }
    }
}

この状態で、これまでと同様に動作確認したところ、1つ目のイベントがエラーとなった後は、そのリトライを繰り返し続け、正常に処理できるはずの2つ目以降のイベントはいっこうに処理されませんでした。

一時的なNW瞬断やDB接続失敗など、時間経過で自動復旧する可能性のあるエラーの場合は、この挙動でも問題ないかもしれません。

しかし、特定のイベントデータでのみ発生するような例外の場合は、そのイベントに対する処理をスキップしない限り、Consumerの処理が完全に止まってしまいます。
100%何としても順序保証したいようなユースケースでは、そのような挙動にするのも仕方ないかもしれません。

ただ、完全停止するぐらいなら、該当イベントだけスキップして処理継続し、影響を局所化した方が望ましいケースも多いはずです。
そのようなケースでは、例外が発生したイベントはいったんデッドレターキュー(DLQ)にためておいて、次のイベントを処理するという方法があります。

DLQについては、ドキュメントの以下に記載されているので、これにならって実装してみます。

DLQへの登録

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.kt
@Configuration
class AxonKafkaConsumerConfig {
		// 〜省略〜

    // 以降を追加
    private val processingGroup = "org.example.axonkafkatrial.consumer.listener"

    @Bean
    fun deadLetterQueueConfigurerModule(): ConfigurerModule =
        ConfigurerModule { configurer: Configurer ->
            configurer.eventProcessing().registerDeadLetterQueue(
                processingGroup
            ) { config: org.axonframework.config.Configuration ->
                    JpaSequencedDeadLetterQueue.builder<EventMessage<*>>()
                        .processingGroup(processingGroup)
                        .entityManagerProvider(config.getComponent(EntityManagerProvider::class.java))
                        .transactionManager(config.getComponent(TransactionManager::class.java))
                        .serializer(config.serializer())
                        .build()
            }
        }
}

これで動作確認すると、先ほどはリトライを繰り返していたイベントがDLQに追加され、後続のイベントが処理されました。

INFO 17779 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : count: 1
INFO 17779 --- [mer.listener]-0] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Adding dead letter with message id [7d5f0865-f4e4-469e-beac-26483c90309b] because [java.lang.RuntimeException].
INFO 17779 --- [mer.listener]-0] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Storing DeadLetter (id: [52ae8169-204b-45d8-a8a3-ce4b37698aff]) for sequence [7d5f0865-f4e4-469e-beac-26483c90309b] with index [0] in processing group [org.example.axonkafkatrial.consumer.listener].
INFO 17779 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : count: 2
INFO 17779 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=4e91a5fb-150f-4bd5-b01d-87de7c88e867, body=test)

なお、MySQLをデータストアとして使用し、JPAでアクセスするJpaSequencedDeadLetterQueueを使用したので、dead_letter_entryテーブルにレコードが追加されています。

DLQに入った最古のイベントを1件のみ再処理

DLQに入ったイベントは、そのまま放っておいても自動では再処理されません。

仮に、例外の原因となったアプリケーションの不具合を修正後に、DLQに入ったイベントを再処理したい場合、SequencedDeadLetterProcessorにより、DLQに入っているイベントを処理する必要があります。

SequencedDeadLetterProcessorを実行するメソッドを用意します。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/service/StreamingProcessorService.kt
package org.example.axonkafkatrial.consumer.service

import org.axonframework.config.EventProcessingConfiguration
import org.axonframework.messaging.deadletter.SequencedDeadLetterProcessor
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component

@Component
class StreamingProcessorService(private val config: EventProcessingConfiguration) {
    private val processingGroup = "org.example.axonkafkatrial.consumer.listener"

    @Scheduled(fixedDelayString = "60000") // 1分間隔でこのメソッドを実行する
    fun retryAnySequence() {
        config.sequencedDeadLetterProcessor(processingGroup)
            .ifPresent { processor: SequencedDeadLetterProcessor<*> -> processor.processAny() }
    }
}

用意したメソッドを定期的に自動実行するため、SpringBootの@Scheduledアノテーションをメソッドに付与しています。

また、スケジュール実行を有効にするため、SpringBootApplicationに@EnableSchedulingアノテーションをつけておきます。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/ConsumerApplication.kt
package org.example.axonkafkatrial.consumer

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.scheduling.annotation.EnableScheduling

@SpringBootApplication
@EnableScheduling // 追加
class ConsumerApplication

fun main(args: Array<String>) {
    runApplication<ConsumerApplication>(*args)
}

上記で用意したretryAnySequenceメソッドだと、メソッドが1回実行されるごとに、DLQに入っている最も古いイベントが1件だけ処理されるようです。

ちなみに、DLQに入ったイベントを再処理した結果、また例外が発生した場合は、以下のようなログが出力されました。

INFO 20668 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Claimed dead letter with id [52ae8169-204b-45d8-a8a3-ce4b37698aff] to process.
INFO 20668 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Processing dead letter with id [52ae8169-204b-45d8-a8a3-ce4b37698aff] at index [0]
INFO 20668 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Using current Thread for last segment worker: TrackingSegmentWorker{processor=org.example.axonkafkatrial.consumer.listener, segment=Segment[0/0]}
INFO 20668 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Fetched token: KafkaTrackingToken{positions={doc-topic-0=10}} for segment: Segment[0/0]
INFO 20668 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : count: 1
WARN 20668 --- [   scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask  : Processing dead letter with message id [7d5f0865-f4e4-469e-beac-26483c90309b] failed.

java.lang.RuntimeException: test exception
	〜省略〜

INFO 20668 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Requeueing dead letter with id [52ae8169-204b-45d8-a8a3-ce4b37698aff] with cause [Optional[Cause{type=[java.lang.RuntimeException]-message=[test exception]}]]

このとき、dead_letter_entryテーブルのレコードは残ったままとなり、last_touchedカラム値が再処理した日時に更新されました。

なお、DLQに複数のイベントがたまっている状態で、retryAnySequenceメソッドが次に実行されたときは、上記で再度エラーとなったイベントの次に古いイベントが処理されました。
このことから、「最も古いイベント」という判定はdead_letter_entryテーブルのレコードのtime_stampカラムやenqueued_atカラムの値でなく、last_touchedカラムの値で判定されていると思われます。
つまり、DLQからの再処理で再度エラーとなったイベントは、DLQの先頭に戻されるのではなく、末尾にリキューされるイメージのようです。

DLQの再処理が成功した場合は、以下のようなログが出力され、dead_letter_entryテーブルから該当レコードが削除されました。

INFO 22279 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Claimed dead letter with id [576f3dbc-ac67-4fc9-a59b-8d28f2120b65] to process.
INFO 22279 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Processing dead letter with id [576f3dbc-ac67-4fc9-a59b-8d28f2120b65] at index [0]
INFO 22279 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Using current Thread for last segment worker: TrackingSegmentWorker{processor=org.example.axonkafkatrial.consumer.listener, segment=Segment[0/0]}
INFO 22279 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : count: 1
INFO 22279 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=debe3364-5576-41da-b398-815f607b305d, body=test)
INFO 22279 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Fetched token: KafkaTrackingToken{positions={doc-topic-0=10}} for segment: Segment[0/0]
INFO 22279 --- [   scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask  : Processing dead letter with message id [d27847cb-23d3-48bd-8ea8-f1bbffd608fd] was successful.
INFO 22279 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Evicting JpaDeadLetter with id 576f3dbc-ac67-4fc9-a59b-8d28f2120b65 for processing group org.example.axonkafkatrial.consumer.listener and sequence d27847cb-23d3-48bd-8ea8-f1bbffd608fd

DLQに入った全てのイベントを再処理

DLQにたまった複数のイベントを、1回のメソッド実行で、1件ずつでなく全て再処理したい場合は、以下のretryAllSequencesメソッドような書き方をするようです。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/service/StreamingProcessorService.kt
package org.example.axonkafkatrial.consumer.service

import org.axonframework.config.EventProcessingConfiguration
import org.axonframework.eventhandling.EventMessage
import org.axonframework.messaging.deadletter.DeadLetter
import org.axonframework.messaging.deadletter.SequencedDeadLetterProcessor
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component

@Component
class StreamingProcessorService(private val config: EventProcessingConfiguration) {
    private val processingGroup = "org.example.axonkafkatrial.consumer.listener"

//    @Scheduled(fixedDelayString = "60000")
    fun retryAnySequence() {
        config.sequencedDeadLetterProcessor(processingGroup)
            .ifPresent { processor: SequencedDeadLetterProcessor<*> -> processor.processAny() }
    }

    // 以下を追加
    @Scheduled(fixedDelayString = "60000")
    fun retryAllSequences() {
        val optionalLetterProcessor = config.sequencedDeadLetterProcessor(processingGroup)
        if (!optionalLetterProcessor.isPresent) {
            return
        }
        val letterProcessor = optionalLetterProcessor.get()

        // Retrieve all the dead lettered event sequences:
        val deadLetterSequences = config.deadLetterQueue(processingGroup)
            .map { it.deadLetters() }
            .orElseThrow { IllegalArgumentException("No such Processing Group") }

        // Iterate over all sequences:
        for (sequence in deadLetterSequences) {
            val sequenceIterator = sequence.iterator()
            val firstLetterId = sequenceIterator.next()
                .message()
                .identifier

            // SequencedDeadLetterProcessor#process automatically retries an entire sequence.
            // Hence, we only need to filter on the first entry of the sequence:
            letterProcessor.process { deadLetter ->
                deadLetter.message().identifier == firstLetterId
            }
        }
    }
}

この状態で動作確認すると、DLQにたまっている複数のイベントを1回のメソッド実行で全て再処理できました。

INFO 25073 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Claimed dead letter with id [bb20b722-5706-44b1-b371-9e6d059f7bc3] to process.
INFO 25073 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Processing dead letter with id [bb20b722-5706-44b1-b371-9e6d059f7bc3] at index [0]
INFO 25073 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : count: 1
INFO 25073 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=ef5bfcbe-7899-442f-83f8-593d81cb32b9, body=test)
INFO 25073 --- [   scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask  : Processing dead letter with message id [4b12de52-fc82-4d31-a565-93c0d332b6fb] was successful.
INFO 25073 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Evicting JpaDeadLetter with id bb20b722-5706-44b1-b371-9e6d059f7bc3 for processing group org.example.axonkafkatrial.consumer.listener and sequence 4b12de52-fc82-4d31-a565-93c0d332b6fb

INFO 25073 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Claimed dead letter with id [498c7d6a-5a76-4837-944e-055efb425b96] to process.
INFO 25073 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Processing dead letter with id [498c7d6a-5a76-4837-944e-055efb425b96] at index [0]
INFO 25073 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : count: 2
INFO 25073 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=6f4cc492-37b1-49ca-b93f-341b97309369, body=test)
INFO 25073 --- [   scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask  : Processing dead letter with message id [f2a8d29f-4c71-4ac1-bf9e-fd2bbbc7b2b5] was successful.
INFO 25073 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Evicting JpaDeadLetter with id 498c7d6a-5a76-4837-944e-055efb425b96 for processing group org.example.axonkafkatrial.consumer.listener and sequence f2a8d29f-4c71-4ac1-bf9e-fd2bbbc7b2b5

INFO 25073 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Claimed dead letter with id [27794c78-e44d-471d-95a7-971128fa58d7] to process.
INFO 25073 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Processing dead letter with id [27794c78-e44d-471d-95a7-971128fa58d7] at index [0]
INFO 25073 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : count: 3
INFO 25073 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=74cab36a-dd55-4348-b18a-76b8d48fefe0, body=test)
INFO 25073 --- [   scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask  : Processing dead letter with message id [601f35e0-62b6-42cf-947a-e43735f12935] was successful.
INFO 25073 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Evicting JpaDeadLetter with id 27794c78-e44d-471d-95a7-971128fa58d7 for processing group org.example.axonkafkatrial.consumer.listener and sequence 601f35e0-62b6-42cf-947a-e43735f12935

DLQ再処理の順序保証

DLQを利用することにより、一部のイベントで例外が発生した場合に、該当イベントをスキップして処理継続し、後から再処理できるようになりました。
とはいえ、いったんスキップして後から再処理するということは、その間に後続イベントと順序が逆転するということでもあります。

DDDの場合、少なくとも集約の識別IDの単位では、作成→更新A→更新B→削除のようなイベントの順序は保証したいところです。

本記事のサンプルコードでは対応・確認できていませんが、ドキュメントに以下の記載があるので、集約の識別IDを明示したイベントの場合は、Axonが集約の識別ID単位で順序保証してくれるのかもしれません。

Axon Framework’s event processors maintain the ordering of events within the same sequence, even when you configure parallel processing. A perfect example when this is a requirement is the need to handle events of the same aggregate in their publishing order. Simply dead lettering one failed event would cause later events in the sequence to be applied to inconsistent state.

So it’s important that a dead-letter queue for events enqueues an event and any following events in the sequence. To that end, the supported dead-letter queue is a so-called SequencedDeadLetterQueue.

このあたりは、今後、AxonでEventSourcingの実装を試す際に確認したいと思います。

通常イベントとDLQ再処理イベントを見分ける

Kafkaから受信したイベントを初めて処理する場合と、一度DLQに入ったイベントを再処理する場合で、処理を分岐したい場合があるかもしれません。

その場合は、EventHandler処理を以下のように記述することもできるようです。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/listener/ConsumerProcess.kt
    @EventHandler
    fun on(event: DocCreated, deadLetter: DeadLetter<EventMessage<DocCreated>>?) {
        if(deadLetter != null) {
            logger.info("handling dead letter: $deadLetter")
        }
        sometimesThrowException(event)
        logger.info("event received: $event")
    }

この例では、DLQからの再処理の場合のみ、追加のログを出力しています。

INFO 28103 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : handling dead letter: JpaDeadLetter{id='3939380e-80a0-4c96-b58d-1afff7241253', index=0, sequenceIdentifier='cf001fb6-c220-4802-bd01-58a449aaae85', enqueuedAt=2024-12-14T07:14:15.402267Z, lastTouched=2024-12-14T07:14:15.402852Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics=, message=GenericTrackedEventMessage{payload={DocCreated(docId=d99056bc-fc9e-4e66-a0ea-7e5c924821a9, body=test)}, metadata={'traceId'->'adb30efe-c0fa-46e5-8c40-4ecfefba2ad1', 'correlationId'->'adb30efe-c0fa-46e5-8c40-4ecfefba2ad1'}, messageIdentifier='cf001fb6-c220-4802-bd01-58a449aaae85', timestamp='2024-12-14T07:14:15.268Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=17}}}}}
INFO 28103 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : count: 1
INFO 28103 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=d99056bc-fc9e-4e66-a0ea-7e5c924821a9, body=test)
INFO 28103 --- [   scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask  : Processing dead letter with message id [cf001fb6-c220-4802-bd01-58a449aaae85] was successful.
INFO 28103 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Evicting JpaDeadLetter with id 3939380e-80a0-4c96-b58d-1afff7241253 for processing group org.example.axonkafkatrial.consumer.listener and sequence cf001fb6-c220-4802-bd01-58a449aaae85

DLQ再処理の対象イベント判定や上限回数のカスタマイズ

エラー発生時に、イベントの種類や状態ごとに、DLQに入れるかどうかを分岐させたり、DLQから再処理する上限回数などをカスタマイズすることもできるようです。

ドキュメントのサンプルコードを参考に、EnqueuePolicyインターフェースを実装したクラスを作成してみます。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/CustomEnqueuePolicy.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.eventhandling.EventMessage
import org.axonframework.messaging.deadletter.DeadLetter
import org.axonframework.messaging.deadletter.Decisions
import org.axonframework.messaging.deadletter.EnqueueDecision
import org.axonframework.messaging.deadletter.EnqueuePolicy
import org.example.axonkafkatrial.shared.event.OtherCreated
import org.slf4j.LoggerFactory

class CustomEnqueuePolicy: EnqueuePolicy<EventMessage<*>> {
    private val logger = LoggerFactory.getLogger(this.javaClass)

    override fun decide(letter: DeadLetter<out EventMessage<*>>, cause: Throwable?): EnqueueDecision<EventMessage<*>> {
        logger.info("deciding on dead letter: $letter")

        if (letter.message().payload is OtherCreated) {
            return Decisions.doNotEnqueue()
        }

        val retries = letter.diagnostics().getOrDefault("retries", -1) as Int
        logger.info("retries: $retries")

        if (retries < 2) {
            return Decisions.requeue(
                cause
            ) { l: DeadLetter<out EventMessage<*>> ->
                l.diagnostics().and("retries", retries + 1)
            }
        }

        return Decisions.evict()
    }
}

このCustomEnqueuePolicyを以下のようにConfigで登録します。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.kt
@Configuration
class AxonKafkaConsumerConfig {
		// 〜省略〜

    // 追加
    @Bean
    fun enqueuePolicyConfigurerModule(): ConfigurerModule =
        ConfigurerModule { configurer ->
            configurer.eventProcessing()
                .registerDeadLetterPolicy(processingGroup) {
                    CustomEnqueuePolicy()
                }
        }
}

これで、以下のように、DLQに入れるイベントの型や再処理の上限回数をプログラムで指定することができました。

# OtherCreatedはエラーになってもDLQに入れない
INFO 33843 --- [mer.listener]-0] o.e.a.c.config.CustomEnqueuePolicy       : deciding on dead letter: GenericDeadLetter{sequenceIdentifier=9462ae64-001a-4b48-b3cd-b769cf9c164c, message=GenericTrackedEventMessage{payload={OtherCreated(id=b4582e8f-b076-4040-869d-408b394e5240, body=test)}, metadata={'traceId'->'45fa55b2-be9b-403e-9f90-7bd19c680780', 'correlationId'->'45fa55b2-be9b-403e-9f90-7bd19c680780'}, messageIdentifier='9462ae64-001a-4b48-b3cd-b769cf9c164c', timestamp='2024-12-14T08:50:08.749Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=28, default-topic-0=7}}}}, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, enqueuedAt=2024-12-14T08:50:08.936920Z, lastTouched=2024-12-14T08:50:08.936921Z, diagnostics=}
INFO 33843 --- [mer.listener]-0] o.a.e.d.DeadLetteringEventHandlerInvoker : The enqueue policy decided not to dead letter event [9462ae64-001a-4b48-b3cd-b769cf9c164c].

# DocCreatedはエラーになったらDLQに入れる
INFO 33843 --- [mer.listener]-0] o.e.a.c.config.CustomEnqueuePolicy       : deciding on dead letter: GenericDeadLetter{sequenceIdentifier=ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2, message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:50:11.960933Z, diagnostics=}
INFO 33843 --- [mer.listener]-0] o.e.a.c.config.CustomEnqueuePolicy       : retries: -1
INFO 33843 --- [mer.listener]-0] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Adding dead letter with message id [ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2] because [java.lang.RuntimeException].
INFO 33843 --- [mer.listener]-0] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Storing DeadLetter (id: [92c12dd9-bfae-4dd7-9f95-2112207e0b1b]) for sequence [ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2] with index [0] in processing group [org.example.axonkafkatrial.consumer.listener].

# 再処理1回目 エラーになったらrequeueする
INFO 33843 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Claimed dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] to process.
INFO 33843 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Processing dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] at index [0]
INFO 33843 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : handling dead letter: JpaDeadLetter{id='92c12dd9-bfae-4dd7-9f95-2112207e0b1b', index=0, sequenceIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:50:11.962090Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics='retries'->'0', message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}}
WARN 33843 --- [   scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask  : Processing dead letter with message id [ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2] failed.

java.lang.RuntimeException: test exception
	〜省略〜

INFO 33843 --- [   scheduling-1] o.e.a.c.config.CustomEnqueuePolicy       : deciding on dead letter: JpaDeadLetter{id='92c12dd9-bfae-4dd7-9f95-2112207e0b1b', index=0, sequenceIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:50:11.962090Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics='retries'->'0', message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}}
INFO 33843 --- [   scheduling-1] o.e.a.c.config.CustomEnqueuePolicy       : retries: 0
INFO 33843 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Requeueing dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] with cause [Optional[Cause{type=[java.lang.RuntimeException]-message=[test exception]}]]

# 再処理2回目 エラーになったらrequeueする
INFO 33843 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Claimed dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] to process.
INFO 33843 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Processing dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] at index [0]
INFO 33843 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : handling dead letter: JpaDeadLetter{id='92c12dd9-bfae-4dd7-9f95-2112207e0b1b', index=0, sequenceIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:50:54.209017Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics='retries'->'1', message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}}
WARN 33843 --- [   scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask  : Processing dead letter with message id [ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2] failed.

java.lang.RuntimeException: test exception
	〜省略〜

INFO 33843 --- [   scheduling-1] o.e.a.c.config.CustomEnqueuePolicy       : deciding on dead letter: JpaDeadLetter{id='92c12dd9-bfae-4dd7-9f95-2112207e0b1b', index=0, sequenceIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:50:54.209017Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics='retries'->'1', message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}}
INFO 33843 --- [   scheduling-1] o.e.a.c.config.CustomEnqueuePolicy       : retries: 1
INFO 33843 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Requeueing dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] with cause [Optional[Cause{type=[java.lang.RuntimeException]-message=[test exception]}]]

# 再処理3回目 エラーになったらevictする
INFO 33843 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Claimed dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] to process.
INFO 33843 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Processing dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] at index [0]
INFO 33843 --- [   scheduling-1] o.e.a.consumer.listener.ConsumerProcess  : handling dead letter: JpaDeadLetter{id='92c12dd9-bfae-4dd7-9f95-2112207e0b1b', index=0, sequenceIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:51:54.254791Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics='retries'->'2', message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}}
WARN 33843 --- [   scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask  : Processing dead letter with message id [ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2] failed.

java.lang.RuntimeException: test exception
	〜省略〜

INFO 33843 --- [   scheduling-1] o.e.a.c.config.CustomEnqueuePolicy       : deciding on dead letter: JpaDeadLetter{id='92c12dd9-bfae-4dd7-9f95-2112207e0b1b', index=0, sequenceIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:51:54.254791Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics='retries'->'2', message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}}
INFO 33843 --- [   scheduling-1] o.e.a.c.config.CustomEnqueuePolicy       : retries: 2
INFO 33843 --- [   scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue  : Evicting JpaDeadLetter with id 92c12dd9-bfae-4dd7-9f95-2112207e0b1b for processing group org.example.axonkafkatrial.consumer.listener and sequence ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2

このとき、dead_letter_entryテーブルのレコードのdiagnosticsカラムに、{"retries":0}のような値が保存され、再処理するごとにインクリメントされていました。

ただ、このretriesの値の意味がちょっとわかりにくくて混乱しました。

上記のコードでは、retriesの上限が2になっていますが、イベントの処理は初回と再処理含めて合計4回実行されています。これは以下のようになっていると思われます。

  • 1回目:初回処理
  • 2回目:DLQからの再処理
  • 3回目:DLQからの再処理のリトライ1回目
  • 4回目:DLQからの再処理のリトライ2回目

つまり、このretriesは、DLQからの再処理に対するリトライ回数のようです。

まとめ

EventHandler処理内で例外が発生した場合の、エラーハンドリングとDLQの利用方法について、ドキュメントを参考に、実装と動作確認を行いました。

Axonの提供するインターフェースを実装したクラスをConfig登録することで、エラーハンドリング方法やDLQへの登録および再処理の方法を、プログラムで自由にカスタマイズできることがわかりました。

リトライやスキップの要否や順序保証の要否などに応じて、EventHandler処理内での例外キャッチ、Axonによるエラーハンドリング・リトライ、DLQの利用などを柔軟に使い分けることで、さまざまなユースケースに対応することができそうです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?