はじめに
Axon Framework(以降、Axonと記述)を使用してKafkaからイベントを受信する実装を試しています。
これまでに、以下の記事に記載したとおり、基本的な実装を行なってきました。
その中で気になった点として、以下のドキュメントを見た際に、実装方法として、SubscribableMessageSource
を使用する方法と、StreamableKafkaMessageSource
を使用する方法の2種類が記載されていました。
本記事では、この2種類の方法をそれぞれ実装してみたうえで、挙動がどう異なるのかを確認した過程を記載します。
StreamableMessageSource
StreamableKafkaMessageSourceを使用する実装
これまでの記事では、特に言及はしていませんでしたが、とりあえずStreamableMessageSource
の方を使用していました。
改めて、そのときの設定内容を確認します。
axon:
axonserver:
enabled: false
serializer:
general: jackson
events: jackson
kafka:
bootstrap-servers: localhost:9094
consumer:
event-processor-mode: tracking
ここでは、event-processor-mode
としてtracking
を指定している部分がポイントです。
ちなみに、以下のドキュメントやサンプルコードを見たところ、event-processor-mode
に設定できる値としては、tracking
と、後述するsubscribing
のほかに、pooled_streaming
もあるようです。
tracking
とpooled_streaming
の2つがStreamableMessageSource
を使用するパターンで、subscribing
がSubscribableMessageSource
を使用するパターンのようですが、pooled_streaming
の詳細については別途調べようと思います。
また、以下のConfigクラスも用意していました。
見ての通り、StreamableKafkaMessageSource
を使用する記述になっています。
package org.example.axonkafkatrial.consumer.config
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.axonframework.serialization.Serializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class AxonKafkaConsumerConfig {
@Value("\${application.kafka.topics}")
private val topics: List<String>? = null
@Bean
fun streamableKafkaMessageSource(
consumerFactory: ConsumerFactory<String, ByteArray>,
fetcher: Fetcher<String, ByteArray, KafkaEventMessage>,
serializer: Serializer,
): StreamableKafkaMessageSource<String, ByteArray> =
StreamableKafkaMessageSource.builder<String, ByteArray>()
.topics(topics)
.consumerFactory(consumerFactory)
.fetcher(fetcher)
.serializer(serializer)
.build()
}
package org.example.axonkafkatrial.consumer.config
import org.axonframework.config.EventProcessingConfigurer
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Configuration
@Configuration
class AxonKafkaTrackingConfig {
@Autowired
fun configure(
config: EventProcessingConfigurer,
streamableKafkaMessageSource: StreamableKafkaMessageSource<String, ByteArray>,
) {
config.registerTrackingEventProcessor("org.example.axonkafkatrial.consumer.listener") {
streamableKafkaMessageSource
}
}
}
この状態で改めて動作確認します。
KafkaやMySQLをdockerで起動し、ProducerとConsumerのアプリケーションを起動した状態で、Producer経由でKafkaにメッセージを送信します。
# KafkaおよびProducerとConsumerそれぞれが使用するMySQLを起動
$ docker compose up -d
# IDE側でProducerとConsumerのアプリケーションを起動
# ProducerのAPIからKafkaにメッセージを送信
$ curl -X POST localhost:8080 \
> -H 'Content-Type: application/json' \
> -d '{"body":"test"}'
{"id":"c6f1865c-7db2-4c27-bda5-702b1ec87e99"}
kafka-uiで、Kafkaにメッセージが送信されていることを確認します。
このときは、Consumersタブは空の状態になっており、Consumerがメッセージを受信できているかどうかは、kafka-ui上では確認できません。
Consumerアプリケーションのログを見ると、以下のようなログが出力されており、メッセージを受信して処理できていることがわかります。
event received: DocCreated(docId=c6f1865c-7db2-4c27-bda5-702b1ec87e99, body=test)
また、Consumer用のMySQLのtoken_entry
テーブルを見ると、以下のようになっています。
| processor_name | segment | owner | timestamp | token | token_type |
|----------------------------------------------|---------|-------------------|--------------------------|---------------------------------|-----------------------------------------------------------------------------------------|
| org.example.axonkafkatrial.consumer.listener | 0 | 52107@my-pc.local | 2024-11-30T01:36:08.491Z | {"positions":{"doc-topic-0":0}} | org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaTrackingToken |
token
カラムの値が{"positions":{"doc-topic-0":0}}
となっており、doc-topic
のパーティション0のオフセット0のメッセージまでが処理済であることがわかります。
SubscribableMessageSource
SubscribableKafkaMessageSourceを使用する実装
次に、SubscribableMessageSource
を使用する方法を試してみます。
最初は、特に詳しく調べもせずに雰囲気で設定を変えてみました。
axon:
axonserver:
enabled: false
serializer:
general: jackson
events: jackson
kafka:
bootstrap-servers: localhost:9094
consumer:
# event-processor-mode: tracking
event-processor-mode: subscribing
event-processor-mode
をsubscribing
に変えてみました。
また、tracking
のときの記述を真似て、Configクラスも変更してみます。
package org.example.axonkafkatrial.consumer.config
import org.axonframework.eventhandling.EventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource
import org.axonframework.serialization.Serializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class AxonKafkaConsumerConfig {
@Value("\${application.kafka.topics}")
private val topics: List<String>? = null
@Bean
fun streamableKafkaMessageSource(
consumerFactory: ConsumerFactory<String, ByteArray>,
fetcher: Fetcher<String, ByteArray, KafkaEventMessage>,
serializer: Serializer,
): StreamableKafkaMessageSource<String, ByteArray> =
StreamableKafkaMessageSource.builder<String, ByteArray>()
.topics(topics)
.consumerFactory(consumerFactory)
.fetcher(fetcher)
.serializer(serializer)
.build()
// 追加
@Bean
fun subscribableKafkaMessageSource(
consumerFactory: ConsumerFactory<String, ByteArray>,
fetcher: Fetcher<String, ByteArray, EventMessage<*>>,
serializer: Serializer,
): SubscribableKafkaMessageSource<String, ByteArray> =
SubscribableKafkaMessageSource.builder<String, ByteArray>()
.topics(topics)
.groupId("axon-kafka-trial-consumer-group") // Hard Requirement
.consumerFactory(consumerFactory)
.fetcher(fetcher)
.serializer(serializer)
.build()
}
SubscribableKafkaMessageSource
を生成するメソッドを追加し、ドキュメントに「Hard Requirement」と書かれていたgroupId
の設定を追加しています。
また、AxonKafkaTrackingConfig
と同等のAxonKafkaSubscribingConfig
を作成します。
package org.example.axonkafkatrial.consumer.config
import org.axonframework.config.EventProcessingConfigurer
import org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.context.annotation.Configuration
@Configuration
@ConditionalOnExpression("'\${axon.kafka.consumer.event-processor-mode}' == 'subscribing'")
class AxonKafkaSubscribingConfig {
@Autowired
fun configure(
config: EventProcessingConfigurer,
subscribableKafkaMessageSource: SubscribableKafkaMessageSource<String, ByteArray>,
) {
config.registerSubscribingEventProcessor("org.example.axonkafkatrial.consumer.listener") {
subscribableKafkaMessageSource
}
}
}
単純にstreamable
だったところをsubscribable
に変えてみました。
また、application.yml側でのevent-processor-mode
の設定値に応じて、使用するConfigクラスを切り替えられるように、@ConditionalOnExpression
アノテーションを追加しています。
これは、元々あるtracking側にも同様に追加しました。
package org.example.axonkafkatrial.consumer.config
import org.axonframework.config.EventProcessingConfigurer
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.context.annotation.Configuration
@Configuration
@ConditionalOnExpression("'\${axon.kafka.consumer.event-processor-mode}' == 'tracking'") // 追加
class AxonKafkaTrackingConfig {
@Autowired
fun configure(
config: EventProcessingConfigurer,
streamableKafkaMessageSource: StreamableKafkaMessageSource<String, ByteArray>,
) {
config.registerTrackingEventProcessor("org.example.axonkafkatrial.consumer.listener") {
streamableKafkaMessageSource
}
}
}
この状態でConsumerアプリケーションを起動してみたところ、全くKafkaからイベント受信しようとしているようなログが出ませんでした。
設定が不足しているか間違っていると思われます。
そこで、改めて以下のドキュメント内のサンプルコードをよく見て、ドキュメントに合わせて修正してみます。
(ドキュメントのサンプルコードはJavaで記述されているので、Kotlinにする必要はあります。)
// ...
public KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer(Configurer configurer) {
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
configurer.registerModule(kafkaMessageSourceConfigurer);
return kafkaMessageSourceConfigurer;
}
public SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource(List<String> topics,
String groupId,
ConsumerFactory<String, byte[]> consumerFactory,
Fetcher<String, byte[], EventMessage<?>> fetcher,
KafkaMessageConverter<String, byte[]> messageConverter,
int consumerCount,
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {
SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<String, byte[]>builder()
.topics(topics) // Defaults to a collection of "Axon.Events"
.groupId(groupId) // Hard requirement
.consumerFactory(consumerFactory) // Hard requirement
.fetcher(fetcher) // Hard requirement
.messageConverter(messageConverter) // Defaults to a "DefaultKafkaMessageConverter"
.consumerCount(consumerCount) // Defaults to a single Consumer
.build();
// Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
kafkaMessageSourceConfigurer.registerSubscribableSource(configuration -> subscribableKafkaMessageSource);
return subscribableKafkaMessageSource;
}
下記のコメントに書かれているように、MessageSourceをConfigurerに登録する必要があるようです。
// Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
また、ドキュメント本文にも以下の記述がありました。
When it comes to configuring a
SubscribableKafkaMessageSource
as a message source for aSubscribingEventProcessor
, there is one additional requirement beside source creation and registration. The source should only start with polling for events as soon as all interested subscribing event processors have been subscribed to it. To ensure theSubscribableKafkaMessageSource#start()
operation is called at the right point in the configuration lifecycle, theKafkaMessageSourceConfigurer
should be utilized:
なので、そのとおりに修正を試みます。
// 追加
@Bean
fun kafkaMessageSourceConfigurer(configurer: Configurer): KafkaMessageSourceConfigurer {
val kafkaMessageSourceConfigurer = KafkaMessageSourceConfigurer()
configurer.registerModule(kafkaMessageSourceConfigurer)
return kafkaMessageSourceConfigurer
}
@Bean
fun subscribableKafkaMessageSource(
consumerFactory: ConsumerFactory<String, ByteArray>,
fetcher: Fetcher<String, ByteArray, EventMessage<*>>,
serializer: Serializer,
kafkaMessageSourceConfigurer: KafkaMessageSourceConfigurer,
): SubscribableKafkaMessageSource<String, ByteArray> {
val subscribableKafkaMessageSource = SubscribableKafkaMessageSource.builder<String, ByteArray>()
.topics(topics)
.groupId("axon-kafka-trial-consumer-group")
.consumerFactory(consumerFactory)
.fetcher(fetcher)
.serializer(serializer)
.build()
// 追加
// registerSubscribableSourceメソッドがないので代わりにそれらしいメソッドで登録
kafkaMessageSourceConfigurer.configureSubscribableSource{
subscribableKafkaMessageSource
}
return subscribableKafkaMessageSource
}
しかし、この状態だと、以下のエラーが発生してアプリケーションが起動できませんでした。
ERROR 63983 --- [ main] o.s.b.d.LoggingFailureAnalysisReporter :
***************************
APPLICATION FAILED TO START
***************************
Description:
The dependencies of some of the beans in the application context form a cycle:
axonKafkaSubscribingConfig
┌─────┐
| springAxonConfigurer defined in class path resource [org/axonframework/springboot/autoconfig/InfraConfiguration.class]
↑ ↓
| kafkaMessageSourceConfigurer defined in class path resource [org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.class]
└─────┘
Action:
Relying upon circular references is discouraged and they are prohibited by default. Update your application to remove the dependency cycle between beans. As a last resort, it may be possible to break the cycle automatically by setting spring.main.allow-circular-references to true.
Process finished with exit code 1
そのため、循環依存を解消できるように調整します。
package org.example.axonkafkatrial.consumer.config
import org.axonframework.eventhandling.EventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource
import org.axonframework.serialization.Serializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class AxonKafkaConsumerConfig {
@Value("\${application.kafka.topics}")
private val topics: List<String>? = null
@Bean
fun streamableKafkaMessageSource(
consumerFactory: ConsumerFactory<String, ByteArray>,
fetcher: Fetcher<String, ByteArray, KafkaEventMessage>,
serializer: Serializer,
): StreamableKafkaMessageSource<String, ByteArray> =
StreamableKafkaMessageSource.builder<String, ByteArray>()
.topics(topics)
.consumerFactory(consumerFactory)
.fetcher(fetcher)
.serializer(serializer)
.build()
@Bean
fun subscribableKafkaMessageSource(
consumerFactory: ConsumerFactory<String, ByteArray>,
fetcher: Fetcher<String, ByteArray, EventMessage<*>>,
serializer: Serializer,
): SubscribableKafkaMessageSource<String, ByteArray> =
// KafkaMessageSourceConfigurerの設定はAxonKafkaSubscribingConfigに移した
SubscribableKafkaMessageSource.builder<String, ByteArray>()
.topics(topics)
.groupId("axon-kafka-trial-consumer-group")
.consumerFactory(consumerFactory)
.fetcher(fetcher)
.serializer(serializer)
.build()
}
package org.example.axonkafkatrial.consumer.config
import org.axonframework.config.Configurer
import org.axonframework.extensions.kafka.configuration.KafkaMessageSourceConfigurer
import org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.context.annotation.Configuration
@Configuration
@ConditionalOnExpression("'\${axon.kafka.consumer.event-processor-mode}' == 'subscribing'")
class AxonKafkaSubscribingConfig {
@Autowired
fun configure(
configurer: Configurer, // EventProcessingConfigurerからConfigurerに変える
subscribableKafkaMessageSource: SubscribableKafkaMessageSource<String, ByteArray>,
) {
// ここでMessageSourceをConfigurerに登録する
val kafkaMessageSourceConfigurer = KafkaMessageSourceConfigurer()
kafkaMessageSourceConfigurer.configureSubscribableSource {
subscribableKafkaMessageSource
}
configurer
.registerModule(kafkaMessageSourceConfigurer)
.eventProcessing()
.registerSubscribingEventProcessor("org.example.axonkafkatrial.consumer.listener") {
subscribableKafkaMessageSource
}
}
}
これでアプリ起動が成功するようになりました。
ここでkafak-uiを見ると、StreamingMessageSource
のときは空だったConsumersのエリアに、axon-kafka-trial-consumer-group
が表示されるようになっています。
一方で、MySQLのtoken_entry
テーブルにはレコードがない状態でした。
この状態でまたProducerからメッセージを送信すれば、Consumer側でメッセージが受信・処理されることを確認できました。
その後にもう一度kafka-uiを見ると、axon-kafka-trial-consumer-group
のConsumer Groupで、doc-topic
のcurrent offsetに、次のオフセットの値(以下のキャプチャでは2
)が表示されていました。
Consumerの初回起動前からKafkaに溜まっていたメッセージが処理されない
実は、上述のとおりアプリケーション起動が成功した際、Kafkaには、Consumerアプリケーションの起動前に既にメッセージを1件送信済の状態でした。
しかし、アプリケーション起動時には、そのメッセージは処理されず、起動後に新たにKafkaに送信されたメッセージだけが処理されたのです。
なお、StreamableMessageSource
のときは、起動前から溜まっていたメッセージも起動時に処理されていました。
そこで、ログをよく見たところ、ConsumerのKafka接続における設定の中で、auto.offset.reset
という設定がlatest
となっていました。デフォルトがlatest
のようです。
INFO 82337 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = latest # ここ
bootstrap.servers = [localhost:9094]
〜省略〜
auto.offset.reset
は、今回のようにアプリケーションの初回起動前のときなど、Kafka側にそのConsumer Groupのオフセットの情報がまだ存在しないときに、初期値をどうするかの設定のようです。
latest
の場合は、最新オフセットが初期値となり、過去のメッセージは処理済扱いで無視されるようです。
earliest
にすると、過去分も含め最初のメッセージから処理するようです。
そこで、以下のように設定したら、初回起動前からあったメッセージも処理されるようになりました。
axon:
axonserver:
enabled: false
serializer:
general: jackson
events: jackson
kafka:
bootstrap-servers: localhost:9094
consumer:
# event-processor-mode: tracking
event-processor-mode: subscribing
# 以下を追加
properties:
auto.offset.reset: earliest
まとめ
SubscribableMessageSource
とStreamableKafkaMessageSource
をそれぞれ使用したときの挙動の大きな違いは、Kafkaトピックの処理済オフセット、つまり、どのメッセージまで処理したかという状態を、管理する方法の違いでした。
StreamableKafkaMessageSource
の場合は、Axon側でTokenStoreによって管理されます。(JpaTokenStore
を使用してDBに永続化している場合はtoken_entry
テーブルによって管理されます。)
SubscribableMessageSource
の場合は、Kafka側でKafkaのConsumer Groupの仕組みによって管理されます。
また、ドキュメントでは、SubscribableMessageSource
について以下のデメリットが記載されています。
Although the
SubscribableKafkaMessageSource
thus provides the niceties the tracking event processor normally provides, it does come with two catches:
- Axon’s approach of the
SequencingPolicy
to deduce which thread receives which events is entirely lost. It is thus dependent on which topic-partition pairs are given to aConsumer
for the events your handlers receives. From a usage perspective this means event message ordering is no longer guaranteed by Axon. It is thus the user’s job to ensure events are published in the right topic-partition pair.- The API Axon provides for resets is entirely lost, since this API can only be correctly triggered through the
TrackingEventProcessor#resetTokens
operation
SubscribableKafkaMessageSource
では、オフセットをAxon側で管理していないがゆえに、複数スレッドでの並行処理をAxon側でコントロールすることができないため、並行処理はKafkaのConsumer Groupの仕組みの範疇で行うことになるのかと思います。
また、オフセットをAxon側で管理していれば、Axonがそれをリセットすることによって処理済メッセージの再生を容易に行うことができるのに対して、Kafka側で管理している場合はそれが不可能になるようです。
Axonの機能を最大限享受するためには、基本的にはStreamableKafkaMessageSource
を選択した方がよいのかもしれません。
今回調べたSubscribableMessageSource
とStreamableKafkaMessageSource
の違いは、Consumerの動作における根本的な部分に見えるので、他の多くの設定や動作に対しても影響が大きいものと思われます。
今後、他の設定等を確認していくうえでも、これらの違いを意識しながら比較調査していこうと思います。