0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Axon FrameworkでKafkaのConsumerを実装するときの2種類のMessageSourceの違い

Posted at

はじめに

Axon Framework(以降、Axonと記述)を使用してKafkaからイベントを受信する実装を試しています。
これまでに、以下の記事に記載したとおり、基本的な実装を行なってきました。

その中で気になった点として、以下のドキュメントを見た際に、実装方法として、SubscribableMessageSourceを使用する方法と、StreamableKafkaMessageSourceを使用する方法の2種類が記載されていました。

本記事では、この2種類の方法をそれぞれ実装してみたうえで、挙動がどう異なるのかを確認した過程を記載します。

StreamableMessageSource

StreamableKafkaMessageSourceを使用する実装

これまでの記事では、特に言及はしていませんでしたが、とりあえずStreamableMessageSourceの方を使用していました。
改めて、そのときの設定内容を確認します。

consumer/src/main/resources/application.yml(抜粋)
axon:
  axonserver:
    enabled: false
  serializer:
    general: jackson
    events: jackson
  kafka:
    bootstrap-servers: localhost:9094
    consumer:
      event-processor-mode: tracking

ここでは、event-processor-modeとしてtrackingを指定している部分がポイントです。

ちなみに、以下のドキュメントやサンプルコードを見たところ、event-processor-modeに設定できる値としては、trackingと、後述するsubscribingのほかに、pooled_streamingもあるようです。

trackingpooled_streamingの2つがStreamableMessageSourceを使用するパターンで、subscribingSubscribableMessageSource を使用するパターンのようですが、pooled_streamingの詳細については別途調べようと思います。

また、以下のConfigクラスも用意していました。
見ての通り、StreamableKafkaMessageSourceを使用する記述になっています。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.axonframework.serialization.Serializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class AxonKafkaConsumerConfig {
    @Value("\${application.kafka.topics}")
    private val topics: List<String>? = null

    @Bean
    fun streamableKafkaMessageSource(
        consumerFactory: ConsumerFactory<String, ByteArray>,
        fetcher: Fetcher<String, ByteArray, KafkaEventMessage>,
        serializer: Serializer,
    ): StreamableKafkaMessageSource<String, ByteArray> =
        StreamableKafkaMessageSource.builder<String, ByteArray>()
            .topics(topics)
            .consumerFactory(consumerFactory)
            .fetcher(fetcher)
            .serializer(serializer)
            .build()
}
consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaTrackingConfig.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.config.EventProcessingConfigurer
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Configuration

@Configuration
class AxonKafkaTrackingConfig {
    @Autowired
    fun configure(
        config: EventProcessingConfigurer,
        streamableKafkaMessageSource: StreamableKafkaMessageSource<String, ByteArray>,
    ) {
        config.registerTrackingEventProcessor("org.example.axonkafkatrial.consumer.listener") {
            streamableKafkaMessageSource
        }
    }
}

この状態で改めて動作確認します。

KafkaやMySQLをdockerで起動し、ProducerとConsumerのアプリケーションを起動した状態で、Producer経由でKafkaにメッセージを送信します。

# KafkaおよびProducerとConsumerそれぞれが使用するMySQLを起動
$ docker compose up -d

# IDE側でProducerとConsumerのアプリケーションを起動

# ProducerのAPIからKafkaにメッセージを送信
$ curl -X POST localhost:8080 \
> -H 'Content-Type: application/json' \
> -d '{"body":"test"}'
{"id":"c6f1865c-7db2-4c27-bda5-702b1ec87e99"}

kafka-uiで、Kafkaにメッセージが送信されていることを確認します。

message-source_message_sent.png

このときは、Consumersタブは空の状態になっており、Consumerがメッセージを受信できているかどうかは、kafka-ui上では確認できません。

message-source_consumers_empty.png

Consumerアプリケーションのログを見ると、以下のようなログが出力されており、メッセージを受信して処理できていることがわかります。

event received: DocCreated(docId=c6f1865c-7db2-4c27-bda5-702b1ec87e99, body=test)

また、Consumer用のMySQLのtoken_entryテーブルを見ると、以下のようになっています。

| processor_name                               | segment | owner             | timestamp                | token                           | token_type                                                                              |
|----------------------------------------------|---------|-------------------|--------------------------|---------------------------------|-----------------------------------------------------------------------------------------|
| org.example.axonkafkatrial.consumer.listener |       0 | 52107@my-pc.local | 2024-11-30T01:36:08.491Z | {"positions":{"doc-topic-0":0}} | org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaTrackingToken |

tokenカラムの値が{"positions":{"doc-topic-0":0}}となっており、doc-topicのパーティション0のオフセット0のメッセージまでが処理済であることがわかります。

SubscribableMessageSource

SubscribableKafkaMessageSourceを使用する実装

次に、SubscribableMessageSourceを使用する方法を試してみます。

最初は、特に詳しく調べもせずに雰囲気で設定を変えてみました。

consumer/src/main/resources/application.yml(抜粋)
axon:
  axonserver:
    enabled: false
  serializer:
    general: jackson
    events: jackson
  kafka:
    bootstrap-servers: localhost:9094
    consumer:
#      event-processor-mode: tracking
      event-processor-mode: subscribing

event-processor-modesubscribingに変えてみました。

また、trackingのときの記述を真似て、Configクラスも変更してみます。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.eventhandling.EventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource
import org.axonframework.serialization.Serializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class AxonKafkaConsumerConfig {
    @Value("\${application.kafka.topics}")
    private val topics: List<String>? = null

    @Bean
    fun streamableKafkaMessageSource(
        consumerFactory: ConsumerFactory<String, ByteArray>,
        fetcher: Fetcher<String, ByteArray, KafkaEventMessage>,
        serializer: Serializer,
    ): StreamableKafkaMessageSource<String, ByteArray> =
        StreamableKafkaMessageSource.builder<String, ByteArray>()
            .topics(topics)
            .consumerFactory(consumerFactory)
            .fetcher(fetcher)
            .serializer(serializer)
            .build()

    // 追加
    @Bean
    fun subscribableKafkaMessageSource(
        consumerFactory: ConsumerFactory<String, ByteArray>,
        fetcher: Fetcher<String, ByteArray, EventMessage<*>>,
        serializer: Serializer,
    ): SubscribableKafkaMessageSource<String, ByteArray> =
        SubscribableKafkaMessageSource.builder<String, ByteArray>()
            .topics(topics)
            .groupId("axon-kafka-trial-consumer-group") // Hard Requirement
            .consumerFactory(consumerFactory)
            .fetcher(fetcher)
            .serializer(serializer)
            .build()
}

SubscribableKafkaMessageSourceを生成するメソッドを追加し、ドキュメントに「Hard Requirement」と書かれていたgroupIdの設定を追加しています。

また、AxonKafkaTrackingConfigと同等のAxonKafkaSubscribingConfigを作成します。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaSubscribingConfig.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.config.EventProcessingConfigurer
import org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.context.annotation.Configuration

@Configuration
@ConditionalOnExpression("'\${axon.kafka.consumer.event-processor-mode}' == 'subscribing'")
class AxonKafkaSubscribingConfig {
    @Autowired
    fun configure(
        config: EventProcessingConfigurer,
        subscribableKafkaMessageSource: SubscribableKafkaMessageSource<String, ByteArray>,
    ) {
        config.registerSubscribingEventProcessor("org.example.axonkafkatrial.consumer.listener") {
            subscribableKafkaMessageSource
        }
    }
}

単純にstreamableだったところをsubscribableに変えてみました。

また、application.yml側でのevent-processor-modeの設定値に応じて、使用するConfigクラスを切り替えられるように、@ConditionalOnExpressionアノテーションを追加しています。
これは、元々あるtracking側にも同様に追加しました。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaTrackingConfig.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.config.EventProcessingConfigurer
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.context.annotation.Configuration

@Configuration
@ConditionalOnExpression("'\${axon.kafka.consumer.event-processor-mode}' == 'tracking'")  // 追加
class AxonKafkaTrackingConfig {
    @Autowired
    fun configure(
        config: EventProcessingConfigurer,
        streamableKafkaMessageSource: StreamableKafkaMessageSource<String, ByteArray>,
    ) {
        config.registerTrackingEventProcessor("org.example.axonkafkatrial.consumer.listener") {
            streamableKafkaMessageSource
        }
    }
}

この状態でConsumerアプリケーションを起動してみたところ、全くKafkaからイベント受信しようとしているようなログが出ませんでした。
設定が不足しているか間違っていると思われます。

そこで、改めて以下のドキュメント内のサンプルコードをよく見て、ドキュメントに合わせて修正してみます。
(ドキュメントのサンプルコードはJavaで記述されているので、Kotlinにする必要はあります。)

    // ...
    public KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer(Configurer configurer) {
        KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
        configurer.registerModule(kafkaMessageSourceConfigurer);
        return kafkaMessageSourceConfigurer;
    }

    public SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource(List<String> topics,
                                                                                         String groupId,
                                                                                         ConsumerFactory<String, byte[]> consumerFactory,
                                                                                         Fetcher<String, byte[], EventMessage<?>> fetcher,
                                                                                         KafkaMessageConverter<String, byte[]> messageConverter,
                                                                                         int consumerCount,
                                                                                         KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {
        SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<String, byte[]>builder()
                .topics(topics)                     // Defaults to a collection of "Axon.Events"
                .groupId(groupId)                   // Hard requirement
                .consumerFactory(consumerFactory)   // Hard requirement
                .fetcher(fetcher)                   // Hard requirement
                .messageConverter(messageConverter) // Defaults to a "DefaultKafkaMessageConverter"
                .consumerCount(consumerCount)       // Defaults to a single Consumer
                .build();
        // Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
        kafkaMessageSourceConfigurer.registerSubscribableSource(configuration -> subscribableKafkaMessageSource);
        return subscribableKafkaMessageSource;
    }

下記のコメントに書かれているように、MessageSourceをConfigurerに登録する必要があるようです。

// Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage

また、ドキュメント本文にも以下の記述がありました。

When it comes to configuring a SubscribableKafkaMessageSource as a message source for a SubscribingEventProcessor, there is one additional requirement beside source creation and registration. The source should only start with polling for events as soon as all interested subscribing event processors have been subscribed to it. To ensure the SubscribableKafkaMessageSource#start() operation is called at the right point in the configuration lifecycle, the KafkaMessageSourceConfigurer should be utilized:

なので、そのとおりに修正を試みます。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.kt
    // 追加
    @Bean
    fun kafkaMessageSourceConfigurer(configurer: Configurer): KafkaMessageSourceConfigurer {
        val kafkaMessageSourceConfigurer = KafkaMessageSourceConfigurer()
        configurer.registerModule(kafkaMessageSourceConfigurer)
        return kafkaMessageSourceConfigurer
    }
    
    @Bean
    fun subscribableKafkaMessageSource(
        consumerFactory: ConsumerFactory<String, ByteArray>,
        fetcher: Fetcher<String, ByteArray, EventMessage<*>>,
        serializer: Serializer,
        kafkaMessageSourceConfigurer: KafkaMessageSourceConfigurer,
    ): SubscribableKafkaMessageSource<String, ByteArray> {
        val subscribableKafkaMessageSource = SubscribableKafkaMessageSource.builder<String, ByteArray>()
            .topics(topics)
            .groupId("axon-kafka-trial-consumer-group")
            .consumerFactory(consumerFactory)
            .fetcher(fetcher)
            .serializer(serializer)
            .build()
        // 追加
        // registerSubscribableSourceメソッドがないので代わりにそれらしいメソッドで登録
        kafkaMessageSourceConfigurer.configureSubscribableSource{
            subscribableKafkaMessageSource
        }
        return subscribableKafkaMessageSource
    }

しかし、この状態だと、以下のエラーが発生してアプリケーションが起動できませんでした。

ERROR 63983 --- [           main] o.s.b.d.LoggingFailureAnalysisReporter   : 

***************************
APPLICATION FAILED TO START
***************************

Description:

The dependencies of some of the beans in the application context form a cycle:

   axonKafkaSubscribingConfig
┌─────┐
|  springAxonConfigurer defined in class path resource [org/axonframework/springboot/autoconfig/InfraConfiguration.class]
↑     ↓
|  kafkaMessageSourceConfigurer defined in class path resource [org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.class]
└─────┘

Action:

Relying upon circular references is discouraged and they are prohibited by default. Update your application to remove the dependency cycle between beans. As a last resort, it may be possible to break the cycle automatically by setting spring.main.allow-circular-references to true.

Process finished with exit code 1

そのため、循環依存を解消できるように調整します。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.eventhandling.EventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource
import org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource
import org.axonframework.serialization.Serializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class AxonKafkaConsumerConfig {
    @Value("\${application.kafka.topics}")
    private val topics: List<String>? = null

    @Bean
    fun streamableKafkaMessageSource(
        consumerFactory: ConsumerFactory<String, ByteArray>,
        fetcher: Fetcher<String, ByteArray, KafkaEventMessage>,
        serializer: Serializer,
    ): StreamableKafkaMessageSource<String, ByteArray> =
        StreamableKafkaMessageSource.builder<String, ByteArray>()
            .topics(topics)
            .consumerFactory(consumerFactory)
            .fetcher(fetcher)
            .serializer(serializer)
            .build()

    @Bean
    fun subscribableKafkaMessageSource(
        consumerFactory: ConsumerFactory<String, ByteArray>,
        fetcher: Fetcher<String, ByteArray, EventMessage<*>>,
        serializer: Serializer,
    ): SubscribableKafkaMessageSource<String, ByteArray> =
        // KafkaMessageSourceConfigurerの設定はAxonKafkaSubscribingConfigに移した
        SubscribableKafkaMessageSource.builder<String, ByteArray>()
            .topics(topics)
            .groupId("axon-kafka-trial-consumer-group")
            .consumerFactory(consumerFactory)
            .fetcher(fetcher)
            .serializer(serializer)
            .build()
}
consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaSubscribingConfig.kt
package org.example.axonkafkatrial.consumer.config

import org.axonframework.config.Configurer
import org.axonframework.extensions.kafka.configuration.KafkaMessageSourceConfigurer
import org.axonframework.extensions.kafka.eventhandling.consumer.subscribable.SubscribableKafkaMessageSource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.context.annotation.Configuration

@Configuration
@ConditionalOnExpression("'\${axon.kafka.consumer.event-processor-mode}' == 'subscribing'")
class AxonKafkaSubscribingConfig {
    @Autowired
    fun configure(
        configurer: Configurer, // EventProcessingConfigurerからConfigurerに変える
        subscribableKafkaMessageSource: SubscribableKafkaMessageSource<String, ByteArray>,
    ) {
        // ここでMessageSourceをConfigurerに登録する
        val kafkaMessageSourceConfigurer = KafkaMessageSourceConfigurer()
        kafkaMessageSourceConfigurer.configureSubscribableSource {
            subscribableKafkaMessageSource
        }
        configurer
		        .registerModule(kafkaMessageSourceConfigurer)
            .eventProcessing()
            .registerSubscribingEventProcessor("org.example.axonkafkatrial.consumer.listener") {
                subscribableKafkaMessageSource
            }
    }
}

これでアプリ起動が成功するようになりました。

ここでkafak-uiを見ると、StreamingMessageSourceのときは空だったConsumersのエリアに、axon-kafka-trial-consumer-groupが表示されるようになっています。

message-source_consumer_registered.png

一方で、MySQLのtoken_entryテーブルにはレコードがない状態でした。

この状態でまたProducerからメッセージを送信すれば、Consumer側でメッセージが受信・処理されることを確認できました。

その後にもう一度kafka-uiを見ると、axon-kafka-trial-consumer-groupのConsumer Groupで、doc-topicのcurrent offsetに、次のオフセットの値(以下のキャプチャでは2)が表示されていました。

message-source_consumer_offset.png

Consumerの初回起動前からKafkaに溜まっていたメッセージが処理されない

実は、上述のとおりアプリケーション起動が成功した際、Kafkaには、Consumerアプリケーションの起動前に既にメッセージを1件送信済の状態でした。
しかし、アプリケーション起動時には、そのメッセージは処理されず、起動後に新たにKafkaに送信されたメッセージだけが処理されたのです。

なお、StreamableMessageSourceのときは、起動前から溜まっていたメッセージも起動時に処理されていました。

そこで、ログをよく見たところ、ConsumerのKafka接続における設定の中で、auto.offset.resetという設定がlatestとなっていました。デフォルトがlatestのようです。

INFO 82337 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.include.jmx.reporter = true
	auto.offset.reset = latest # ここ
	bootstrap.servers = [localhost:9094]
	〜省略〜

auto.offset.resetは、今回のようにアプリケーションの初回起動前のときなど、Kafka側にそのConsumer Groupのオフセットの情報がまだ存在しないときに、初期値をどうするかの設定のようです。

latestの場合は、最新オフセットが初期値となり、過去のメッセージは処理済扱いで無視されるようです。
earliestにすると、過去分も含め最初のメッセージから処理するようです。

そこで、以下のように設定したら、初回起動前からあったメッセージも処理されるようになりました。

consumer/src/main/resources/application.yml(抜粋)
axon:
  axonserver:
    enabled: false
  serializer:
    general: jackson
    events: jackson
  kafka:
    bootstrap-servers: localhost:9094
    consumer:
#      event-processor-mode: tracking
      event-processor-mode: subscribing
    # 以下を追加
    properties:
      auto.offset.reset: earliest

まとめ

SubscribableMessageSourceStreamableKafkaMessageSourceをそれぞれ使用したときの挙動の大きな違いは、Kafkaトピックの処理済オフセット、つまり、どのメッセージまで処理したかという状態を、管理する方法の違いでした。

StreamableKafkaMessageSourceの場合は、Axon側でTokenStoreによって管理されます。(JpaTokenStoreを使用してDBに永続化している場合はtoken_entryテーブルによって管理されます。)

SubscribableMessageSourceの場合は、Kafka側でKafkaのConsumer Groupの仕組みによって管理されます。

また、ドキュメントでは、SubscribableMessageSourceについて以下のデメリットが記載されています。

Although the SubscribableKafkaMessageSource thus provides the niceties the tracking event processor normally provides, it does come with two catches:

  1. Axon’s approach of the SequencingPolicy to deduce which thread receives which events is entirely lost. It is thus dependent on which topic-partition pairs are given to a Consumer for the events your handlers receives. From a usage perspective this means event message ordering is no longer guaranteed by Axon. It is thus the user’s job to ensure events are published in the right topic-partition pair.
  2. The API Axon provides for resets is entirely lost, since this API can only be correctly triggered through the TrackingEventProcessor#resetTokens operation

SubscribableKafkaMessageSourceでは、オフセットをAxon側で管理していないがゆえに、複数スレッドでの並行処理をAxon側でコントロールすることができないため、並行処理はKafkaのConsumer Groupの仕組みの範疇で行うことになるのかと思います。

また、オフセットをAxon側で管理していれば、Axonがそれをリセットすることによって処理済メッセージの再生を容易に行うことができるのに対して、Kafka側で管理している場合はそれが不可能になるようです。

Axonの機能を最大限享受するためには、基本的にはStreamableKafkaMessageSourceを選択した方がよいのかもしれません。

今回調べたSubscribableMessageSourceStreamableKafkaMessageSourceの違いは、Consumerの動作における根本的な部分に見えるので、他の多くの設定や動作に対しても影響が大きいものと思われます。
今後、他の設定等を確認していくうえでも、これらの違いを意識しながら比較調査していこうと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?