はじめに
Axon Framework(以降、Axonと記述)を使用してKafkaからイベントを受信する実装を試しています。
これまでに、以下の記事に記載したとおり、基本的な実装を行なってきました。
Axonでは、Kafkaからイベントを受信すると、@EventHandler
アノテーションを付与したメソッドが実行されます。
本記事では、@EventHandler
アノテーションを付与したメソッド内で例外が発生した場合のエラーハンドリングとして、Axonの機能でどのようなことができるかを確認しました。
なお、エラーハンドリングにおける挙動は、Axonのconsumer.event-processor-mode
の設定によって異なると思われます。本記事ではconsumer.event-processor-mode
がtracking
の場合の挙動について確認しています。
consumer.event-processor-mode
の設定による正常時の挙動の違いについては、以下の記事で確認しているのでご参照ください。
また、作成したコードの全体は以下のリポジトリをご参照ください。
デフォルトの挙動
以下のように、EventHandler処理内で1回目だけエラーを発生させるようにしてみます。
package org.example.axonkafkatrial.consumer.listener
import org.axonframework.eventhandling.EventHandler
import org.example.axonkafkatrial.shared.event.DocCreated
import org.example.axonkafkatrial.shared.event.OtherCreated
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
@Component
class ConsumerProcess {
private val logger = LoggerFactory.getLogger(this.javaClass)
@EventHandler
fun on(event: DocCreated) {
sometimesThrowException() // これを追加
logger.info("event received: $event")
}
@EventHandler
fun on(event: OtherCreated) {
logger.info("event received: $event")
}
private var count = 0
private fun sometimesThrowException() {
count++
logger.info("count: $count")
if ( count == 1) {
throw RuntimeException("test exception")
}
}
}
この状態で、Producerアプリケーションを利用して、複数回イベントを送信します。
$ curl -X POST localhost:8080 -H 'Content-Type: application/json' -d '{"body":"test"}'
{"id":"2e09e07b-0adb-49df-a6aa-6083cbc1288f"}
$ curl -X POST localhost:8080 -H 'Content-Type: application/json' -d '{"body":"test"}'
{"id":"a4c35087-ee4a-4888-b200-1b8a1862088e"}
$ curl -X POST localhost:8080 -H 'Content-Type: application/json' -d '{"body":"test"}'
{"id":"2dc381a2-a71a-4a2b-b893-100ec162250d"}
すると、期待通り、1つ目のイベント受信時の処理で例外が発生しました。
2つ目と3つ目のイベントは正常に処理されて、event received: ~
というログが出力されています。
INFO 92920 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : count: 1
ERROR 92920 --- [mer.listener]-0] o.a.eventhandling.LoggingErrorHandler : EventListener [ConsumerProcess] failed to handle event [3af44507-ac22-4f80-925e-1c21dff96ef1] (org.example.axonkafkatrial.shared.event.DocCreated). Continuing processing with next listener
java.lang.RuntimeException: test exception
at org.example.axonkafkatrial.consumer.listener.ConsumerProcess.sometimesThrowException(ConsumerProcess.kt:29) ~[main/:na]
at org.example.axonkafkatrial.consumer.listener.ConsumerProcess.on(ConsumerProcess.kt:15) ~[main/:na]
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
at org.axonframework.messaging.annotation.AnnotatedMessageHandlingMember.handle(AnnotatedMessageHandlingMember.java:153) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.messaging.annotation.WrappedMessageHandlingMember.handle(WrappedMessageHandlingMember.java:64) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.tracing.TracingHandlerEnhancerDefinition$1.lambda$handle$1(TracingHandlerEnhancerDefinition.java:84) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.tracing.TracingHandlerEnhancerDefinition$1.handle(TracingHandlerEnhancerDefinition.java:84) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.messaging.annotation.NoMoreInterceptors.handle(NoMoreInterceptors.java:46) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.AnnotationEventHandlerAdapter.handle(AnnotationEventHandlerAdapter.java:100) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.SimpleEventHandlerInvoker.invokeHandlers(SimpleEventHandlerInvoker.java:128) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.SimpleEventHandlerInvoker.handle(SimpleEventHandlerInvoker.java:114) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.MultiEventHandlerInvoker.handle(MultiEventHandlerInvoker.java:91) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.AbstractEventProcessor.processMessageInUnitOfWork(AbstractEventProcessor.java:195) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$1(AbstractEventProcessor.java:173) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:57) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.messaging.interceptors.CorrelationDataInterceptor.handle(CorrelationDataInterceptor.java:67) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.TrackingEventProcessor.lambda$new$1(TrackingEventProcessor.java:181) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.messaging.DefaultInterceptorChain.proceed(DefaultInterceptorChain.java:55) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$2(AbstractEventProcessor.java:174) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.AbstractEventProcessor.lambda$null$3(AbstractEventProcessor.java:170) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:92) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.AbstractEventProcessor.lambda$processInUnitOfWork$4(AbstractEventProcessor.java:166) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.tracing.Span.runCallable(Span.java:132) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.AbstractEventProcessor.processInUnitOfWork(AbstractEventProcessor.java:165) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:491) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:316) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.TrackingEventProcessor$TrackingSegmentWorker.run(TrackingEventProcessor.java:1197) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.cleanUp(TrackingEventProcessor.java:1396) ~[axon-messaging-4.10.2.jar:4.10.2]
at org.axonframework.eventhandling.TrackingEventProcessor$WorkerLauncher.run(TrackingEventProcessor.java:1373) ~[axon-messaging-4.10.2.jar:4.10.2]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
INFO 92920 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : count: 2
INFO 92920 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=a4c35087-ee4a-4888-b200-1b8a1862088e, body=test)
INFO 92920 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : count: 3
INFO 92920 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=2dc381a2-a71a-4a2b-b893-100ec162250d, body=test)
ここで気になるのは、1つ目のイベント(docId=2e09e07b-0adb-49df-a6aa-6083cbc1288f
)が例外発生してエラーになった後、特にリトライされていない(event received: 〜
というログが出力されていない)ように見えることです。
そこで、ドキュメントを見てみます。
In the case of a Streaming Event Processor, this means the processor will go into error mode, releasing any tokens and retrying at an incremental interval (starting at 1 second, up to max 60 seconds).
この記述だけ見たときは、デフォルトで自動リトライされるのかと思ったのですが、上記のようにログを見る限りはリトライされている様子がありませんでした。
ただ、改めてよく読むと、以下の記述もありました。
The component dealing with exceptions thrown from an event handling method is called the
ListenerInvocationErrorHandler
. By default, these exceptions are logged (with theLoggingErrorHandler
implementation), and processing continues with the next handler or message.
デフォルトではログだけ出して次のイベント(message)を処理するようです。
エラーハンドリングのカスタマイズ
ドキュメントには以下の記述があり、エラーハンドリングはカスタマイズできるようです。
The default
ListenerInvocationErrorHandler
used by each processing group can be customized. Furthermore, we can configure the error handling behavior per processing group:
ListenerInvocationErrorHandler
のinterfaceを実装したクラスを作成して登録すればよさそうです。
そこで、動作確認用に、ログだけ吐いてthrowしなおすHandlerクラスを作成して登録してみます。
package org.example.axonkafkatrial.consumer.config
import org.axonframework.eventhandling.EventMessage
import org.axonframework.eventhandling.EventMessageHandler
import org.axonframework.eventhandling.ListenerInvocationErrorHandler
import org.slf4j.LoggerFactory
import java.lang.Exception
class RethrowErrorHandler: ListenerInvocationErrorHandler {
private val logger = LoggerFactory.getLogger(this.javaClass)
override fun onError(exception: Exception, event: EventMessage<*>, eventHandler: EventMessageHandler) {
logger.error("Rethrow event handling error: $event", exception)
throw exception
}
}
このRethrowErrorHandler
を以下のようにConfigで登録します。
@Configuration
class AxonKafkaConsumerConfig {
// 〜省略〜
// 追加
@Bean
fun processingGroupErrorHandlingConfigurerModule(): ConfigurerModule =
ConfigurerModule { configurer: Configurer ->
configurer.eventProcessing { processingConfigurer: EventProcessingConfigurer ->
processingConfigurer.registerDefaultListenerInvocationErrorHandler {
RethrowErrorHandler()
}
}
}
}
この状態で、アプリケーションを再起動したうえで、再度イベントを送信して、例外を発生させます。
$ curl -X POST localhost:8080 -H 'Content-Type: application/json' -d '{"body":"test"}'
{"id":"ccc320b9-d199-4b5b-b1aa-d3e75138127f"}
すると、例外発生してエラーになったイベントがリトライで成功するようになりました。
INFO 95538 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : count: 1
ERROR 95538 --- [mer.listener]-0] o.e.a.c.config.RethrowErrorHandler : Rethrow event handling error: GenericTrackedEventMessage{payload={DocCreated(docId=ccc320b9-d199-4b5b-b1aa-d3e75138127f, body=test)}, metadata={'traceId'->'02eb9d61-0931-4bc1-afb5-9b0e149e23e5', 'correlationId'->'02eb9d61-0931-4bc1-afb5-9b0e149e23e5'}, messageIdentifier='706cf31e-f098-45d9-8389-70029df1aece', timestamp='2024-12-14T01:47:01.662Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=6}}}}
java.lang.RuntimeException: test exception
at org.example.axonkafkatrial.consumer.listener.ConsumerProcess.sometimesThrowException(ConsumerProcess.kt:29) ~[main/:na]
〜省略〜
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
WARN 95538 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Error occurred. Starting retry mode.
java.lang.RuntimeException: test exception
at org.example.axonkafkatrial.consumer.listener.ConsumerProcess.sometimesThrowException(ConsumerProcess.kt:29) ~[main/:na]
〜省略〜
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
WARN 95538 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Releasing claim on token and preparing for retry in 1s
INFO 95538 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Released claim
INFO 95538 --- [mer.listener]-0] o.a.e.k.e.consumer.FetchEventsTask : Closing down FetchEventsTask using Consumer [org.apache.kafka.clients.consumer.KafkaConsumer@1d21c89c]
〜省略〜
INFO 95538 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : count: 2
INFO 95538 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=ccc320b9-d199-4b5b-b1aa-d3e75138127f, body=test)
ログを見ると、1回目(count: 1
)の処理で例外発生してエラーとなった後、retry modeが始まり、最初のリトライ時(count: 2
)で処理が成功したことがわかります。
次に、どこまでリトライされるのかを確認するため、1回目だけでなく常に例外発生するようにしてみます。
private var count = 0
private fun sometimesThrowException() {
count++
logger.info("count: $count")
// if ( count == 1) {
throw RuntimeException("test exception")
// }
}
すると、Exponential Backoffでリトライし続けました。
ログを見ると、リトライ間隔が1s、2s、4sのように増えていっていることがわかります。
INFO 96419 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : count: 1
ERROR 96419 --- [mer.listener]-0] o.e.a.c.config.RethrowErrorHandler : Rethrow event handling error: GenericTrackedEventMessage{payload={DocCreated(docId=e34e936e-10fd-45ca-af3e-b17875df8f84, body=test)}, metadata={'traceId'->'f2eb3f73-4c5c-47b4-95c5-ba9a128b799b', 'correlationId'->'f2eb3f73-4c5c-47b4-95c5-ba9a128b799b'}, messageIdentifier='7d5f0865-f4e4-469e-beac-26483c90309b', timestamp='2024-12-14T01:55:51.887Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=7}}}}
java.lang.RuntimeException: test exception
〜省略〜
WARN 96419 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Error occurred. Starting retry mode.
java.lang.RuntimeException: test exception
〜省略〜
WARN 96419 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Releasing claim on token and preparing for retry in 1s
〜省略〜
INFO 96419 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : count: 2
ERROR 96419 --- [mer.listener]-0] o.e.a.c.config.RethrowErrorHandler : Rethrow event handling error: GenericTrackedEventMessage{payload={DocCreated(docId=e34e936e-10fd-45ca-af3e-b17875df8f84, body=test)}, metadata={'traceId'->'f2eb3f73-4c5c-47b4-95c5-ba9a128b799b', 'correlationId'->'f2eb3f73-4c5c-47b4-95c5-ba9a128b799b'}, messageIdentifier='7d5f0865-f4e4-469e-beac-26483c90309b', timestamp='2024-12-14T01:55:51.887Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=7}}}}
java.lang.RuntimeException: test exception
〜省略〜
WARN 96419 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Releasing claim on token and preparing for retry in 2s
〜省略〜
INFO 96419 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : count: 3
ERROR 96419 --- [mer.listener]-0] o.e.a.c.config.RethrowErrorHandler : Rethrow event handling error: GenericTrackedEventMessage{payload={DocCreated(docId=e34e936e-10fd-45ca-af3e-b17875df8f84, body=test)}, metadata={'traceId'->'f2eb3f73-4c5c-47b4-95c5-ba9a128b799b', 'correlationId'->'f2eb3f73-4c5c-47b4-95c5-ba9a128b799b'}, messageIdentifier='7d5f0865-f4e4-469e-beac-26483c90309b', timestamp='2024-12-14T01:55:51.887Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=7}}}}
java.lang.RuntimeException: test exception
〜省略〜
WARN 96419 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Releasing claim on token and preparing for retry in 4s
デッドレターキュー(DLQ)の利用
上記のとおりリトライされるようにしたうえで、今度は1つ目のイベントおよびそのリトライでは例外発生するが、2つ目以降のイベントでは成功するようにします。
@Component
class ConsumerProcess {
private val logger = LoggerFactory.getLogger(this.javaClass)
@EventHandler
fun on(event: DocCreated) {
sometimesThrowException(event)
logger.info("event received: $event")
}
@EventHandler
fun on(event: OtherCreated) {
logger.info("event received: $event")
}
// 1つ目のイベントだけ例外発生させる
private var count = 0
private var eventToFail: DocCreated? = null
private fun sometimesThrowException(event: DocCreated) {
count++
logger.info("count: $count")
if ( count == 1 || eventToFail == event) {
eventToFail = event
throw RuntimeException("test exception")
}
}
}
この状態で、これまでと同様に動作確認したところ、1つ目のイベントがエラーとなった後は、そのリトライを繰り返し続け、正常に処理できるはずの2つ目以降のイベントはいっこうに処理されませんでした。
一時的なNW瞬断やDB接続失敗など、時間経過で自動復旧する可能性のあるエラーの場合は、この挙動でも問題ないかもしれません。
しかし、特定のイベントデータでのみ発生するような例外の場合は、そのイベントに対する処理をスキップしない限り、Consumerの処理が完全に止まってしまいます。
100%何としても順序保証したいようなユースケースでは、そのような挙動にするのも仕方ないかもしれません。
ただ、完全停止するぐらいなら、該当イベントだけスキップして処理継続し、影響を局所化した方が望ましいケースも多いはずです。
そのようなケースでは、例外が発生したイベントはいったんデッドレターキュー(DLQ)にためておいて、次のイベントを処理するという方法があります。
DLQについては、ドキュメントの以下に記載されているので、これにならって実装してみます。
DLQへの登録
@Configuration
class AxonKafkaConsumerConfig {
// 〜省略〜
// 以降を追加
private val processingGroup = "org.example.axonkafkatrial.consumer.listener"
@Bean
fun deadLetterQueueConfigurerModule(): ConfigurerModule =
ConfigurerModule { configurer: Configurer ->
configurer.eventProcessing().registerDeadLetterQueue(
processingGroup
) { config: org.axonframework.config.Configuration ->
JpaSequencedDeadLetterQueue.builder<EventMessage<*>>()
.processingGroup(processingGroup)
.entityManagerProvider(config.getComponent(EntityManagerProvider::class.java))
.transactionManager(config.getComponent(TransactionManager::class.java))
.serializer(config.serializer())
.build()
}
}
}
これで動作確認すると、先ほどはリトライを繰り返していたイベントがDLQに追加され、後続のイベントが処理されました。
INFO 17779 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : count: 1
INFO 17779 --- [mer.listener]-0] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Adding dead letter with message id [7d5f0865-f4e4-469e-beac-26483c90309b] because [java.lang.RuntimeException].
INFO 17779 --- [mer.listener]-0] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Storing DeadLetter (id: [52ae8169-204b-45d8-a8a3-ce4b37698aff]) for sequence [7d5f0865-f4e4-469e-beac-26483c90309b] with index [0] in processing group [org.example.axonkafkatrial.consumer.listener].
INFO 17779 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : count: 2
INFO 17779 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=4e91a5fb-150f-4bd5-b01d-87de7c88e867, body=test)
なお、MySQLをデータストアとして使用し、JPAでアクセスするJpaSequencedDeadLetterQueue
を使用したので、dead_letter_entry
テーブルにレコードが追加されています。
DLQに入った最古のイベントを1件のみ再処理
DLQに入ったイベントは、そのまま放っておいても自動では再処理されません。
仮に、例外の原因となったアプリケーションの不具合を修正後に、DLQに入ったイベントを再処理したい場合、SequencedDeadLetterProcessor
により、DLQに入っているイベントを処理する必要があります。
SequencedDeadLetterProcessor
を実行するメソッドを用意します。
package org.example.axonkafkatrial.consumer.service
import org.axonframework.config.EventProcessingConfiguration
import org.axonframework.messaging.deadletter.SequencedDeadLetterProcessor
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
@Component
class StreamingProcessorService(private val config: EventProcessingConfiguration) {
private val processingGroup = "org.example.axonkafkatrial.consumer.listener"
@Scheduled(fixedDelayString = "60000") // 1分間隔でこのメソッドを実行する
fun retryAnySequence() {
config.sequencedDeadLetterProcessor(processingGroup)
.ifPresent { processor: SequencedDeadLetterProcessor<*> -> processor.processAny() }
}
}
用意したメソッドを定期的に自動実行するため、SpringBootの@Scheduled
アノテーションをメソッドに付与しています。
また、スケジュール実行を有効にするため、SpringBootApplicationに@EnableScheduling
アノテーションをつけておきます。
package org.example.axonkafkatrial.consumer
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.scheduling.annotation.EnableScheduling
@SpringBootApplication
@EnableScheduling // 追加
class ConsumerApplication
fun main(args: Array<String>) {
runApplication<ConsumerApplication>(*args)
}
上記で用意したretryAnySequence
メソッドだと、メソッドが1回実行されるごとに、DLQに入っている最も古いイベントが1件だけ処理されるようです。
ちなみに、DLQに入ったイベントを再処理した結果、また例外が発生した場合は、以下のようなログが出力されました。
INFO 20668 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Claimed dead letter with id [52ae8169-204b-45d8-a8a3-ce4b37698aff] to process.
INFO 20668 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Processing dead letter with id [52ae8169-204b-45d8-a8a3-ce4b37698aff] at index [0]
INFO 20668 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Using current Thread for last segment worker: TrackingSegmentWorker{processor=org.example.axonkafkatrial.consumer.listener, segment=Segment[0/0]}
INFO 20668 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Fetched token: KafkaTrackingToken{positions={doc-topic-0=10}} for segment: Segment[0/0]
INFO 20668 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : count: 1
WARN 20668 --- [ scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask : Processing dead letter with message id [7d5f0865-f4e4-469e-beac-26483c90309b] failed.
java.lang.RuntimeException: test exception
〜省略〜
INFO 20668 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Requeueing dead letter with id [52ae8169-204b-45d8-a8a3-ce4b37698aff] with cause [Optional[Cause{type=[java.lang.RuntimeException]-message=[test exception]}]]
このとき、dead_letter_entry
テーブルのレコードは残ったままとなり、last_touched
カラム値が再処理した日時に更新されました。
なお、DLQに複数のイベントがたまっている状態で、retryAnySequence
メソッドが次に実行されたときは、上記で再度エラーとなったイベントの次に古いイベントが処理されました。
このことから、「最も古いイベント」という判定はdead_letter_entry
テーブルのレコードのtime_stamp
カラムやenqueued_at
カラムの値でなく、last_touched
カラムの値で判定されていると思われます。
つまり、DLQからの再処理で再度エラーとなったイベントは、DLQの先頭に戻されるのではなく、末尾にリキューされるイメージのようです。
DLQの再処理が成功した場合は、以下のようなログが出力され、dead_letter_entry
テーブルから該当レコードが削除されました。
INFO 22279 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Claimed dead letter with id [576f3dbc-ac67-4fc9-a59b-8d28f2120b65] to process.
INFO 22279 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Processing dead letter with id [576f3dbc-ac67-4fc9-a59b-8d28f2120b65] at index [0]
INFO 22279 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Using current Thread for last segment worker: TrackingSegmentWorker{processor=org.example.axonkafkatrial.consumer.listener, segment=Segment[0/0]}
INFO 22279 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : count: 1
INFO 22279 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=debe3364-5576-41da-b398-815f607b305d, body=test)
INFO 22279 --- [mer.listener]-0] o.a.e.TrackingEventProcessor : Fetched token: KafkaTrackingToken{positions={doc-topic-0=10}} for segment: Segment[0/0]
INFO 22279 --- [ scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask : Processing dead letter with message id [d27847cb-23d3-48bd-8ea8-f1bbffd608fd] was successful.
INFO 22279 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Evicting JpaDeadLetter with id 576f3dbc-ac67-4fc9-a59b-8d28f2120b65 for processing group org.example.axonkafkatrial.consumer.listener and sequence d27847cb-23d3-48bd-8ea8-f1bbffd608fd
DLQに入った全てのイベントを再処理
DLQにたまった複数のイベントを、1回のメソッド実行で、1件ずつでなく全て再処理したい場合は、以下のretryAllSequences
メソッドような書き方をするようです。
package org.example.axonkafkatrial.consumer.service
import org.axonframework.config.EventProcessingConfiguration
import org.axonframework.eventhandling.EventMessage
import org.axonframework.messaging.deadletter.DeadLetter
import org.axonframework.messaging.deadletter.SequencedDeadLetterProcessor
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
@Component
class StreamingProcessorService(private val config: EventProcessingConfiguration) {
private val processingGroup = "org.example.axonkafkatrial.consumer.listener"
// @Scheduled(fixedDelayString = "60000")
fun retryAnySequence() {
config.sequencedDeadLetterProcessor(processingGroup)
.ifPresent { processor: SequencedDeadLetterProcessor<*> -> processor.processAny() }
}
// 以下を追加
@Scheduled(fixedDelayString = "60000")
fun retryAllSequences() {
val optionalLetterProcessor = config.sequencedDeadLetterProcessor(processingGroup)
if (!optionalLetterProcessor.isPresent) {
return
}
val letterProcessor = optionalLetterProcessor.get()
// Retrieve all the dead lettered event sequences:
val deadLetterSequences = config.deadLetterQueue(processingGroup)
.map { it.deadLetters() }
.orElseThrow { IllegalArgumentException("No such Processing Group") }
// Iterate over all sequences:
for (sequence in deadLetterSequences) {
val sequenceIterator = sequence.iterator()
val firstLetterId = sequenceIterator.next()
.message()
.identifier
// SequencedDeadLetterProcessor#process automatically retries an entire sequence.
// Hence, we only need to filter on the first entry of the sequence:
letterProcessor.process { deadLetter ->
deadLetter.message().identifier == firstLetterId
}
}
}
}
この状態で動作確認すると、DLQにたまっている複数のイベントを1回のメソッド実行で全て再処理できました。
INFO 25073 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Claimed dead letter with id [bb20b722-5706-44b1-b371-9e6d059f7bc3] to process.
INFO 25073 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Processing dead letter with id [bb20b722-5706-44b1-b371-9e6d059f7bc3] at index [0]
INFO 25073 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : count: 1
INFO 25073 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=ef5bfcbe-7899-442f-83f8-593d81cb32b9, body=test)
INFO 25073 --- [ scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask : Processing dead letter with message id [4b12de52-fc82-4d31-a565-93c0d332b6fb] was successful.
INFO 25073 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Evicting JpaDeadLetter with id bb20b722-5706-44b1-b371-9e6d059f7bc3 for processing group org.example.axonkafkatrial.consumer.listener and sequence 4b12de52-fc82-4d31-a565-93c0d332b6fb
INFO 25073 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Claimed dead letter with id [498c7d6a-5a76-4837-944e-055efb425b96] to process.
INFO 25073 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Processing dead letter with id [498c7d6a-5a76-4837-944e-055efb425b96] at index [0]
INFO 25073 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : count: 2
INFO 25073 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=6f4cc492-37b1-49ca-b93f-341b97309369, body=test)
INFO 25073 --- [ scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask : Processing dead letter with message id [f2a8d29f-4c71-4ac1-bf9e-fd2bbbc7b2b5] was successful.
INFO 25073 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Evicting JpaDeadLetter with id 498c7d6a-5a76-4837-944e-055efb425b96 for processing group org.example.axonkafkatrial.consumer.listener and sequence f2a8d29f-4c71-4ac1-bf9e-fd2bbbc7b2b5
INFO 25073 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Claimed dead letter with id [27794c78-e44d-471d-95a7-971128fa58d7] to process.
INFO 25073 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Processing dead letter with id [27794c78-e44d-471d-95a7-971128fa58d7] at index [0]
INFO 25073 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : count: 3
INFO 25073 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=74cab36a-dd55-4348-b18a-76b8d48fefe0, body=test)
INFO 25073 --- [ scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask : Processing dead letter with message id [601f35e0-62b6-42cf-947a-e43735f12935] was successful.
INFO 25073 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Evicting JpaDeadLetter with id 27794c78-e44d-471d-95a7-971128fa58d7 for processing group org.example.axonkafkatrial.consumer.listener and sequence 601f35e0-62b6-42cf-947a-e43735f12935
DLQ再処理の順序保証
DLQを利用することにより、一部のイベントで例外が発生した場合に、該当イベントをスキップして処理継続し、後から再処理できるようになりました。
とはいえ、いったんスキップして後から再処理するということは、その間に後続イベントと順序が逆転するということでもあります。
DDDの場合、少なくとも集約の識別IDの単位では、作成→更新A→更新B→削除のようなイベントの順序は保証したいところです。
本記事のサンプルコードでは対応・確認できていませんが、ドキュメントに以下の記載があるので、集約の識別IDを明示したイベントの場合は、Axonが集約の識別ID単位で順序保証してくれるのかもしれません。
Axon Framework’s event processors maintain the ordering of events within the same sequence, even when you configure parallel processing. A perfect example when this is a requirement is the need to handle events of the same aggregate in their publishing order. Simply dead lettering one failed event would cause later events in the sequence to be applied to inconsistent state.
So it’s important that a dead-letter queue for events enqueues an event and any following events in the sequence. To that end, the supported dead-letter queue is a so-called
SequencedDeadLetterQueue
.
このあたりは、今後、AxonでEventSourcingの実装を試す際に確認したいと思います。
通常イベントとDLQ再処理イベントを見分ける
Kafkaから受信したイベントを初めて処理する場合と、一度DLQに入ったイベントを再処理する場合で、処理を分岐したい場合があるかもしれません。
その場合は、EventHandler処理を以下のように記述することもできるようです。
@EventHandler
fun on(event: DocCreated, deadLetter: DeadLetter<EventMessage<DocCreated>>?) {
if(deadLetter != null) {
logger.info("handling dead letter: $deadLetter")
}
sometimesThrowException(event)
logger.info("event received: $event")
}
この例では、DLQからの再処理の場合のみ、追加のログを出力しています。
INFO 28103 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : handling dead letter: JpaDeadLetter{id='3939380e-80a0-4c96-b58d-1afff7241253', index=0, sequenceIdentifier='cf001fb6-c220-4802-bd01-58a449aaae85', enqueuedAt=2024-12-14T07:14:15.402267Z, lastTouched=2024-12-14T07:14:15.402852Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics=, message=GenericTrackedEventMessage{payload={DocCreated(docId=d99056bc-fc9e-4e66-a0ea-7e5c924821a9, body=test)}, metadata={'traceId'->'adb30efe-c0fa-46e5-8c40-4ecfefba2ad1', 'correlationId'->'adb30efe-c0fa-46e5-8c40-4ecfefba2ad1'}, messageIdentifier='cf001fb6-c220-4802-bd01-58a449aaae85', timestamp='2024-12-14T07:14:15.268Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=17}}}}}
INFO 28103 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : count: 1
INFO 28103 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : event received: DocCreated(docId=d99056bc-fc9e-4e66-a0ea-7e5c924821a9, body=test)
INFO 28103 --- [ scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask : Processing dead letter with message id [cf001fb6-c220-4802-bd01-58a449aaae85] was successful.
INFO 28103 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Evicting JpaDeadLetter with id 3939380e-80a0-4c96-b58d-1afff7241253 for processing group org.example.axonkafkatrial.consumer.listener and sequence cf001fb6-c220-4802-bd01-58a449aaae85
DLQ再処理の対象イベント判定や上限回数のカスタマイズ
エラー発生時に、イベントの種類や状態ごとに、DLQに入れるかどうかを分岐させたり、DLQから再処理する上限回数などをカスタマイズすることもできるようです。
ドキュメントのサンプルコードを参考に、EnqueuePolicy
インターフェースを実装したクラスを作成してみます。
package org.example.axonkafkatrial.consumer.config
import org.axonframework.eventhandling.EventMessage
import org.axonframework.messaging.deadletter.DeadLetter
import org.axonframework.messaging.deadletter.Decisions
import org.axonframework.messaging.deadletter.EnqueueDecision
import org.axonframework.messaging.deadletter.EnqueuePolicy
import org.example.axonkafkatrial.shared.event.OtherCreated
import org.slf4j.LoggerFactory
class CustomEnqueuePolicy: EnqueuePolicy<EventMessage<*>> {
private val logger = LoggerFactory.getLogger(this.javaClass)
override fun decide(letter: DeadLetter<out EventMessage<*>>, cause: Throwable?): EnqueueDecision<EventMessage<*>> {
logger.info("deciding on dead letter: $letter")
if (letter.message().payload is OtherCreated) {
return Decisions.doNotEnqueue()
}
val retries = letter.diagnostics().getOrDefault("retries", -1) as Int
logger.info("retries: $retries")
if (retries < 2) {
return Decisions.requeue(
cause
) { l: DeadLetter<out EventMessage<*>> ->
l.diagnostics().and("retries", retries + 1)
}
}
return Decisions.evict()
}
}
このCustomEnqueuePolicy
を以下のようにConfigで登録します。
@Configuration
class AxonKafkaConsumerConfig {
// 〜省略〜
// 追加
@Bean
fun enqueuePolicyConfigurerModule(): ConfigurerModule =
ConfigurerModule { configurer ->
configurer.eventProcessing()
.registerDeadLetterPolicy(processingGroup) {
CustomEnqueuePolicy()
}
}
}
これで、以下のように、DLQに入れるイベントの型や再処理の上限回数をプログラムで指定することができました。
# OtherCreatedはエラーになってもDLQに入れない
INFO 33843 --- [mer.listener]-0] o.e.a.c.config.CustomEnqueuePolicy : deciding on dead letter: GenericDeadLetter{sequenceIdentifier=9462ae64-001a-4b48-b3cd-b769cf9c164c, message=GenericTrackedEventMessage{payload={OtherCreated(id=b4582e8f-b076-4040-869d-408b394e5240, body=test)}, metadata={'traceId'->'45fa55b2-be9b-403e-9f90-7bd19c680780', 'correlationId'->'45fa55b2-be9b-403e-9f90-7bd19c680780'}, messageIdentifier='9462ae64-001a-4b48-b3cd-b769cf9c164c', timestamp='2024-12-14T08:50:08.749Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=28, default-topic-0=7}}}}, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, enqueuedAt=2024-12-14T08:50:08.936920Z, lastTouched=2024-12-14T08:50:08.936921Z, diagnostics=}
INFO 33843 --- [mer.listener]-0] o.a.e.d.DeadLetteringEventHandlerInvoker : The enqueue policy decided not to dead letter event [9462ae64-001a-4b48-b3cd-b769cf9c164c].
# DocCreatedはエラーになったらDLQに入れる
INFO 33843 --- [mer.listener]-0] o.e.a.c.config.CustomEnqueuePolicy : deciding on dead letter: GenericDeadLetter{sequenceIdentifier=ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2, message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:50:11.960933Z, diagnostics=}
INFO 33843 --- [mer.listener]-0] o.e.a.c.config.CustomEnqueuePolicy : retries: -1
INFO 33843 --- [mer.listener]-0] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Adding dead letter with message id [ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2] because [java.lang.RuntimeException].
INFO 33843 --- [mer.listener]-0] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Storing DeadLetter (id: [92c12dd9-bfae-4dd7-9f95-2112207e0b1b]) for sequence [ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2] with index [0] in processing group [org.example.axonkafkatrial.consumer.listener].
# 再処理1回目 エラーになったらrequeueする
INFO 33843 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Claimed dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] to process.
INFO 33843 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Processing dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] at index [0]
INFO 33843 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : handling dead letter: JpaDeadLetter{id='92c12dd9-bfae-4dd7-9f95-2112207e0b1b', index=0, sequenceIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:50:11.962090Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics='retries'->'0', message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}}
WARN 33843 --- [ scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask : Processing dead letter with message id [ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2] failed.
java.lang.RuntimeException: test exception
〜省略〜
INFO 33843 --- [ scheduling-1] o.e.a.c.config.CustomEnqueuePolicy : deciding on dead letter: JpaDeadLetter{id='92c12dd9-bfae-4dd7-9f95-2112207e0b1b', index=0, sequenceIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:50:11.962090Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics='retries'->'0', message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}}
INFO 33843 --- [ scheduling-1] o.e.a.c.config.CustomEnqueuePolicy : retries: 0
INFO 33843 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Requeueing dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] with cause [Optional[Cause{type=[java.lang.RuntimeException]-message=[test exception]}]]
# 再処理2回目 エラーになったらrequeueする
INFO 33843 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Claimed dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] to process.
INFO 33843 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Processing dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] at index [0]
INFO 33843 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : handling dead letter: JpaDeadLetter{id='92c12dd9-bfae-4dd7-9f95-2112207e0b1b', index=0, sequenceIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:50:54.209017Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics='retries'->'1', message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}}
WARN 33843 --- [ scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask : Processing dead letter with message id [ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2] failed.
java.lang.RuntimeException: test exception
〜省略〜
INFO 33843 --- [ scheduling-1] o.e.a.c.config.CustomEnqueuePolicy : deciding on dead letter: JpaDeadLetter{id='92c12dd9-bfae-4dd7-9f95-2112207e0b1b', index=0, sequenceIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:50:54.209017Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics='retries'->'1', message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}}
INFO 33843 --- [ scheduling-1] o.e.a.c.config.CustomEnqueuePolicy : retries: 1
INFO 33843 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Requeueing dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] with cause [Optional[Cause{type=[java.lang.RuntimeException]-message=[test exception]}]]
# 再処理3回目 エラーになったらevictする
INFO 33843 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Claimed dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] to process.
INFO 33843 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Processing dead letter with id [92c12dd9-bfae-4dd7-9f95-2112207e0b1b] at index [0]
INFO 33843 --- [ scheduling-1] o.e.a.consumer.listener.ConsumerProcess : handling dead letter: JpaDeadLetter{id='92c12dd9-bfae-4dd7-9f95-2112207e0b1b', index=0, sequenceIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:51:54.254791Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics='retries'->'2', message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}}
WARN 33843 --- [ scheduling-1] o.a.e.d.DeadLetteredEventProcessingTask : Processing dead letter with message id [ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2] failed.
java.lang.RuntimeException: test exception
〜省略〜
INFO 33843 --- [ scheduling-1] o.e.a.c.config.CustomEnqueuePolicy : deciding on dead letter: JpaDeadLetter{id='92c12dd9-bfae-4dd7-9f95-2112207e0b1b', index=0, sequenceIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', enqueuedAt=2024-12-14T08:50:11.960932Z, lastTouched=2024-12-14T08:51:54.254791Z, cause=Cause{type=[java.lang.RuntimeException]-message=[test exception]}, diagnostics='retries'->'2', message=GenericTrackedEventMessage{payload={DocCreated(docId=023f57b8-f022-4ed4-a5cc-85085d326e6d, body=test)}, metadata={'traceId'->'27391757-302b-47e4-9a47-de73030c17fa', 'correlationId'->'27391757-302b-47e4-9a47-de73030c17fa'}, messageIdentifier='ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2', timestamp='2024-12-14T08:50:11.934Z, trackingToken={KafkaTrackingToken{positions={doc-topic-0=29, default-topic-0=7}}}}}
INFO 33843 --- [ scheduling-1] o.e.a.c.config.CustomEnqueuePolicy : retries: 2
INFO 33843 --- [ scheduling-1] o.a.e.d.jpa.JpaSequencedDeadLetterQueue : Evicting JpaDeadLetter with id 92c12dd9-bfae-4dd7-9f95-2112207e0b1b for processing group org.example.axonkafkatrial.consumer.listener and sequence ad5fe4de-5ab5-4b08-bedb-e08c28c2e4f2
このとき、dead_letter_entry
テーブルのレコードのdiagnostics
カラムに、{"retries":0}
のような値が保存され、再処理するごとにインクリメントされていました。
ただ、このretries
の値の意味がちょっとわかりにくくて混乱しました。
上記のコードでは、retries
の上限が2
になっていますが、イベントの処理は初回と再処理含めて合計4回実行されています。これは以下のようになっていると思われます。
- 1回目:初回処理
- 2回目:DLQからの再処理
- 3回目:DLQからの再処理のリトライ1回目
- 4回目:DLQからの再処理のリトライ2回目
つまり、このretries
は、DLQからの再処理に対するリトライ回数のようです。
まとめ
EventHandler処理内で例外が発生した場合の、エラーハンドリングとDLQの利用方法について、ドキュメントを参考に、実装と動作確認を行いました。
Axonの提供するインターフェースを実装したクラスをConfig登録することで、エラーハンドリング方法やDLQへの登録および再処理の方法を、プログラムで自由にカスタマイズできることがわかりました。
リトライやスキップの要否や順序保証の要否などに応じて、EventHandler処理内での例外キャッチ、Axonによるエラーハンドリング・リトライ、DLQの利用などを柔軟に使い分けることで、さまざまなユースケースに対応することができそうです。