0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Axon FrameworkでKafkaからイベント受信するConsumerを複数ノードで負荷分散させる [Kotlin,SpringBoot]

Last updated at Posted at 2025-01-18

はじめに

Axon Framework(以降、Axonと記述)を使用してKafkaからイベントを受信する実装を試しています。
これまでに、以下の記事などに記載したとおり、基本的な実装を行なってきました。

本記事では、Consumerアプリケーションを複数ノードで実行して並列分散処理させる方法を試した過程を記載します。

複数ノードで並列分散させたいと考えた背景として、もともとSpringBootアプリケーションでバックエンドAPIサーバをたてており、それと同じプロセス内でKafkaからのイベント受信をトリガーにした処理も実行したいと考えました。

APIリクエストはロードバランサーにより複数ノードで負荷分散してるので、リソース利用状況がノード間で偏らないように、Consumerとしての処理も同様に複数ノードで負荷分散したいところです。

なお、作成したコードの全体は以下のリポジトリをご参照ください。

複数ノードでの動作確認の準備

docker composeを使用し、ローカル環境でConsumerアプリケーションを複数プロセス起動します。

これまでの記事では、Consumerアプリケーションはdockerは使わずにIDEから直接起動していましたが、複数プロセス起動するために、dockerイメージを作成してdocker composeで複数のserviceを起動したいと思います。

dockerイメージはSpringBootのgradleタスクを使用して作成しました。
なお、コードを変更した際は、都度このコマンドでイメージを再作成しています。

$ cd consumer
$ ./gradlew bootBuildImage

アプリケーションからDBやKafkaへの接続情報は、これまではlocalhostで接続する前提の記述になっていましたが、docker composeのservice間で通信できるように環境変数で設定できるようにします。

consumer/src/main/resources/application.yml
server:
  port: 8081
spring:
  datasource:
    url: jdbc:mysql://${DB_HOST:localhost}:${DB_PORT:3316}/${DB_NAME:trial}
    username: ${DB_USER:docker}
    password: ${DB_USER:docker}
    driver-class-name: com.mysql.cj.jdbc.Driver
  jpa:
    database: mysql
    hibernate:
      ddl-auto: update
axon:
  axonserver:
    enabled: false
  serializer:
    general: jackson
    events: jackson
  kafka:
    client-id: axon-kafka-trial-consumer-${random.uuid}
    bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9094}
    consumer:
      event-processor-mode: tracking
#      event-processor-mode: subscribing
#    properties:
#      auto.offset.reset: earliest
    default-topic: doc-topic
application:
  kafka:
    topics: doc-topic, default-topic

compose.ymlにConsumerアプリケーションのserviceを2つ追加します。

なお、最初は—scaleオプションを使用することも考えましたが、2つのserviceのうちいずれかを指定して起動・停止することがやりやすいように、記述は少し冗長になるものの、consumer-1consumer-2の2つを別serviceとして定義することにしました。

compose.yml
services:
	# 〜省略〜

  # 追加
  consumer-1:
    image: axon_kafka_trial_consumer:1.0-SNAPSHOT
    depends_on:
      - kafka-0
      - consumer-db
    environment:
      - BOOTSTRAP_SERVERS=kafka-0:9092
      - DB_HOST=consumer-db
      - DB_PORT=3306
      - DB_NAME=trial
      - DB_USER=docker
      - DB_PASS=docker
    networks:
      - network0
    profiles:
      - extra

  # 追加
  consumer-2:
    image: axon_kafka_trial_consumer:1.0-SNAPSHOT
    depends_on:
      - kafka-0
      - consumer-db
    environment:
      - BOOTSTRAP_SERVERS=kafka-0:9092
      - DB_HOST=consumer-db
      - DB_PORT=3306
      - DB_NAME=trial
      - DB_USER=docker
      - DB_PASS=docker
    networks:
      - network0
    profiles:
      - extra

networks:
  network0:

ちなみにこのとき、もともとcompose.yml内で定義していたconsumer-dbについても、consumer-1およびconsumer-2のコンテナから通信できるようにnetwork0のネットワークに所属するように、合わせて修正してます。

trackingモードでの動作確認

trackingモードとsubscribingモードについての補足

AxonでConsumerを実装するときの設定として、2種類のMessageSourceのどちらかを選択する必要がある点については、以下の記事で記載しました。

この記事では、event-processor-modeの設定値のうち、trackingモード(StreamingMessageSource)とsubscribingモード(SubscribingMessageSource)について記載していました。

複数ノード分散においても、それぞれの設定ごとに必要な実装や挙動が異なったので、まずはtrackingモードについて記載していきます。

trackingモードの場合、application.ymlの設定は前述のものから変更不要です。

consumer/src/main/resources/application.yml(抜粋)
axon:
  kafka:
    consumer:
      event-processor-mode: tracking
#      event-processor-mode: subscribing
#    properties:
#      auto.offset.reset: earliest

初期状態では負荷分散されない

それでは、さっそく動作確認していきます。

# Consumer以外のDB、Kafka等を起動 ProducerアプリケーションもIDEから起動済の状態
$ docker compose up -d

# consumer-1を起動
$ docker compose up -d consumer-1

# Producerアプリケーション経由でイベント送信
$ curl -X POST localhost:8080 -H 'Content-Type: application/json'  -d '{"body":"test"}'
{"id":"18029c94-06dd-440d-b91b-d4e6a46d07ee"}

# consumer-1でイベント受信してログ出力できている
$ docker compose logs consumer-1
consumer-1-1  | INFO 1 --- [mer.listener]-0] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=axon-kafka-trial-consumer-e0f4079e-08bd-4826-93df-58265803be26, groupId=null] Seeking to offset 0 for partition doc-topic-0
consumer-1-1  | INFO 1 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=18029c94-06dd-440d-b91b-d4e6a46d07ee, body=test)

# consumer-2を起動
$ docker compose up -d consumer-2

# イベントを複数連続送信
$ curl -X POST localhost:8080 -H 'Content-Type: application/json'  -d '{"body":"test"}'
{"id":"a1e99b64-dbac-4b6f-9843-623fbdbdeda9"}
$ curl -X POST localhost:8080 -H 'Content-Type: application/json'  -d '{"body":"test"}'
{"id":"29e09ab2-5a22-4847-a3a1-6e947945475f"}
$ curl -X POST localhost:8080 -H 'Content-Type: application/json'  -d '{"body":"test"}'
{"id":"fbb27aaf-0d1a-49a7-a929-71c67c967b8f"}

# consumer-1が全てのイベントを受信して処理している
$ docker compose logs consumer-1
consumer-1-1  | INFO 1 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=a1e99b64-dbac-4b6f-9843-623fbdbdeda9, body=test)
consumer-1-1  | INFO 1 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=29e09ab2-5a22-4847-a3a1-6e947945475f, body=test)
consumer-1-1  | INFO 1 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=fbb27aaf-0d1a-49a7-a929-71c67c967b8f, body=test)

# consumer-2は何もしていない(アプリケーション起動時のログ以降、Kafka接続しようとすらしていない)
$ docker compose logs consumer-2
consumer-2-1  | INFO 1 --- [           main] o.e.a.consumer.ConsumerApplicationKt     : Started ConsumerApplicationKt in 6.322 seconds (process running for 6.785)

# consumer-1を停止する
$ docker compose down consumer-1

# consumer-2がKafka接続を開始する
$ docker compose logs consumer-2
consumer-2-1  | INFO 1 --- [mer.listener]-0] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=axon-kafka-trial-consumer-d8263fc8-cf73-481c-96b5-cd2a2030b72b, groupId=null] Seeking to offset 5 for partition doc-topic-0

# 再度イベント送信する
$ curl -X POST localhost:8080 -H 'Content-Type: application/json'  -d '{"body":"test"}'
{"id":"a548ec48-72ea-4640-a54b-d69317e509ae"}

# consumer-2が受信して処理するようになった
$ docker compose logs consumer-2
consumer-2-1  | INFO 1 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=a548ec48-72ea-4640-a54b-d69317e509ae, body=test)

# consumer-1を再起動する
$ docker compose up -d consumer-1
$ docker compose logs consumer-1
consumer-1-1  | INFO 1 --- [           main] o.e.a.consumer.ConsumerApplicationKt     : Started ConsumerApplicationKt in 6.506 seconds (process running for 6.993)

# 再度イベント送信する
$ curl -X POST localhost:8080 -H 'Content-Type: application/json'  -d '{"body":"test"}'
{"id":"eb3fc695-4a02-4691-a4a9-b58a49e8be6d"}

# 今度はconsumer-2が全てのイベントを受信して処理しており、consumer-1は何もしていない
$ docker compose logs consumer-2
consumer-2-1  | INFO 1 --- [mer.listener]-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=eb3fc695-4a02-4691-a4a9-b58a49e8be6d, body=test)

上記のとおりで、Consumerアプリケーションを2プロセス起動しても、最初に起動した方のプロセスが全てのイベントを処理してしまい、自動で負荷分散はされませんでした。

このとき、DBのtoken_entryテーブルを見ると、1レコードのみが存在しており、segmentカラムの値は0となっていました。

また、token_entryテーブルにはownerというカラムもあり、このときは1@4ade2915b80eのような値になっていました。

Consumerが2プロセス起動した状態から片方を停止した場合、このownerカラムの値が以下のように変化していくことが確認できました。

  1. 2プロセス起動している状態: 1@4ade2915b80e
  2. 1プロセスを停止した直後: null
  3. 1プロセスを停止した数秒後: 1@5324a319784b(1.の時点とは別の値)

この動きから、このownerカラムの値について、以下のことが読み取れます。

  • segment番号が0のsegmentをどのプロセスが処理しているかを示している
  • 1つのsegmentを処理できるプロセスは1つだけ
  • segmentを処理していたプロセスが停止すると、ownerカラムはnullになる
  • ownerカラムがnullになると、起動中の別プロセスがsegment番号が0のsegmentを処理しようとする
  • segment番号が0のsegmentを新しく処理するようになったプロセスの値がownerカラムに入る

そのため、どのプロセスも処理していないsegment(ownerカラムがnull であるsegment)が残っていない状態では、Consumerアプリケーションのプロセスを増やしたところで、増えたプロセスはやることがない状態になるようです。

このあたりの仕様についてはドキュメントの以下あたりに記載されています。
https://docs.axoniq.io/axon-framework-reference/development/events/event-processors/streaming/#tracking-tokens

token_entryテーブルに存在するレコードがTrackingTokenであり、複数のConsumerプロセスが存在する場合は、ownerとしてこのTrackingTokenを処理する権利(ドキュメント内だとclaimと表現されていそう)を奪い合う(スキあらば獲る)ような動きをするものと理解しました。

ちなみに、ownerカラムの値は<プロセスID>@<プロセスのホスト>のような構成になっているようです。上記の場合は、consumer-1またはconsumer-2のコンテナの、プロセスIDが1であるプロセスを指していると思われます。

複数のsegmentが存在する状態にする

上記の理解で合っているとすると、例えば2プロセスで並列処理するには、segmentが2つ以上存在する(token_entryテーブルに2レコードが存在する)必要があると思われます。

そこで、ドキュメントの以下あたりの記載を参考にして、segment数が2つになるようにしてみます。
https://docs.axoniq.io/axon-framework-reference/development/events/event-processors/streaming/#_thread_configuration

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/config/AxonKafkaConsumerConfig.kt
@Configuration
class AxonKafkaConsumerConfig {
    private val processingGroup = "org.example.axonkafkatrial.consumer.listener"
    // 〜省略〜

    // 追加
    @Bean
    fun threadCountConfigurerModule(): ConfigurerModule {
        val tepConfig =
            TrackingEventProcessorConfiguration
                .forParallelProcessing(2)
                .andInitialSegmentsCount(2)

        return ConfigurerModule { configurer: Configurer ->
            configurer.eventProcessing { processingConfigurer: EventProcessingConfigurer ->
                processingConfigurer.registerTrackingEventProcessorConfiguration(processingGroup) {
                    tepConfig
                }
            }
        }
    }
    // 〜省略〜
}

しかし、この状態でアプリケーションを再起動しても、token_entryテーブルは1レコードのままでした。
そこで、ドキュメントを読み進めると、以下の記載がありました。

When there is a high event load, ideally, we increase the number of segments. In turn, we can reduce the number of segments again if the load on the streaming processor decreases. To change the number of segments at runtime, the split and merge operations should be used. Splitting and merging allow you to control the number of segments dynamically. There are roughly three approaches to do this.

1 . AxonIQ Console

Through AxonIQ Console's processor detail page, where you can scale the segments manually, or configure your segments to scale automatically with the number of your application’s replicas. It’s the easiest to set up and use.

2 . Axon Server

The Axon Server Dashboard contains split and merge buttons to adjust the number of segments. While it’s straightforward to use as well, it does not support automatic scaling based on the number of replicas.

3 . Manual programming

If none of the other two options are available, you can adjust the number of segments through the Axon Framework API. The StreamingEventProcessor exposes the splitSegment(int segmentId) and mergeSegment(int segmentId) methods. To obtain the StreamingEventProcessor, you can use the EventProcessingConfiguration to retrieve the processor by name.

どうもsegment数を後から増減させるためには、明示的にsplitSegmentmergeSegmentをする必要があるようです。

segment単位でイベントの処理順序や処理済否を管理しているので、下手に数だけ増やしたらその管理が破綻するリスクがあるのだと思います。そのため、複数のsegmentがある中で「このsegmentを2つに分ける」「このsegmentとこのsegmentを統合する」というのを意識する必要があるのだと思われます。
AWSのKinesisのシャードの管理と似ているのかもしれません。

上記のドキュメント記載のとおり、AxonServerを使っていれば比較的容易にsplit/mergeができるようなのですが、今回はAxonServerでなくKafkaなので、残念ながら3. Manual Programmingで対応が必要そうです。

ドキュメント内のサンプルコードを参考にsplitSegmentを行うメソッドを用意します。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/service/StreamingProcessorService.kt
@Component
class StreamingProcessorService(private val config: EventProcessingConfiguration) {
    private val logger = LoggerFactory.getLogger(this.javaClass)

    private val processingGroup = "org.example.axonkafkatrial.consumer.listener"
    
    // 〜省略〜

    // 追加
    fun splitSegmentFor(segmentId: Int) {
        config.eventProcessor(processingGroup, StreamingEventProcessor::class.java)
            .ifPresent { streamingProcessor ->
               streamingProcessor.splitSegment(segmentId)
            }
    }
}

また、Consumerアプリケーションにsplitの指示を出すためのAPIエンドポイントを用意します。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/controller/SegmentController.kt
package org.example.axonkafkatrial.consumer.controller

import org.example.axonkafkatrial.consumer.service.StreamingProcessorService
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.*

@RestController
class SegmentController(private val streamingProcessorService: StreamingProcessorService) {
    @PutMapping("/segments/{id}")
    @ResponseStatus(HttpStatus.OK)
    fun splitSegment(@PathVariable id: Int): Response {
        streamingProcessorService.splitSegmentFor(id)
        return Response("success")
    }
}

data class Response(val body: String)

このAPIエンドポイントを叩いて、segment番号が0であるsegmentを分割します。

 $ curl -X PUT localhost:8081/segments/0 

これで、tokent_entryテーブルが2レコードに増えることが確認できました。

また、ドキュメントには以下の記載もありました。

Note that if you are moving towards a solution using the StreamingProcessorController, there are a couple of points to consider. When invoking the split/merge operation on a StreamingEventProcessor, that processor should be in charge of the segment you want to split or merge. Thus, either the streaming processor already has a claim on the segments or can claim the segments. Without the claims, the processor will simply fail the split or merge operation.

It is advised to check which segments a streaming processor has a claim on. For that, status of the processor is used. The status information shows which segments a processor instance owns. This guides which processor to invoke the split or merge on.

つまり、自プロセスが既にownerとして処理しているsegmentしかsplit/mergeはできないようです。

今回は1プロセスで処理している状態からの分割だったので考慮不要でしたが、既に複数segmentを複数プロセスで処理している状態から更に分割する場合には、splitSegmentを実行しようとしているsegmentを自プロセスが処理しているかどうかを事前にチェックする必要がありそうです。(本記事ではいったん実装は割愛します。)

ちなみに、segment数を2にするConfigを追加した状態であれば、docker compose downなどでDBを初期化したりtokent_entryテーブルをtruncateした状態でConsumerアプリケーションを起動すると、tokent_entryテーブルは最初から2レコードとなっていました。

複数segmentを複数プロセスで分担する

前述のとおり、segment数を2つにすることはできました。
しかし、この状態で再度動作確認したところ、期待どおりに負荷分散はされませんでした。

最初に確認したときと同じで、最初に起動した方のプロセスがsegmentを2つとも処理してしまい、手放さないので、後から起動したプロセスはやはりやることがない状態になってしまったのです。

せっかくsegment数を2つにしたので、ちゃんと2プロセスで分担したいところです。
複数ノード(プロセス)での処理については、ドキュメントに以下の記載がありました。
https://docs.axoniq.io/axon-framework-reference/development/events/event-processors/streaming/#_multi_node_processing

In a multi-node scenario, a fair distribution of the segments is often desired. Otherwise, the event processing load could be distributed unequally over the active instances. There are roughly three approaches to balancing the number of segments claimed per node:

  1. Through the Axon Server Dashboard’s load balancing feature.
  2. For Axon Server and Spring Boot users, you can use the axon.axonserver.eventhandling.processors.[processor-name].load-balancing-strategy application property.
  3. Directly on a StreamingEventProcessor, with the releaseSegment(int segmentId) or releaseSegment(int segmentId, long releaseDuration, TimeUnit unit)method.

AxonServerであればよしなに負荷分散してくれそうな記述ですが、やはりKafkaの場合は自分でreleaseSegmentしろ、ということのようです。

そこで、ドキュメント内のサンプルコードを参考にreleaseSegmentを試してみます。
まずはsplitSegmentのときと同様にAPIエンドポイントでreleaseSegmentを指示するようにします。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/service/StreamingProcessorService.kt
package org.example.axonkafkatrial.consumer.service

@Component
class StreamingProcessorService(private val config: EventProcessingConfiguration) {
    private val processingGroup = "org.example.axonkafkatrial.consumer.listener"
    // 〜省略〜

    // 追加
    fun releaseSegmentFor(segmentId: Int) {
        config.eventProcessor(processingGroup, StreamingEventProcessor::class.java)
            .ifPresent { streamingProcessor -> streamingProcessor.releaseSegment(segmentId) }
    }
}
consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/controller/SegmentController.kt
@RestController
class SegmentController(private val streamingProcessorService: StreamingProcessorService) {
    @PutMapping("/segments/{id}")
    @ResponseStatus(HttpStatus.OK)
    fun splitSegment(@PathVariable id: Int): Response {
        streamingProcessorService.splitSegmentFor(id)
        return Response("success")
    }

    // 追加
    @DeleteMapping("/segments/{id}")
    @ResponseStatus(HttpStatus.OK)
    fun releaseSegment(@PathVariable id: Int): Response {
        streamingProcessorService.releaseSegmentFor(id)
        return Response("success")
    }
}

最初に起動したconsumer-1がsegment番号01の2つとも処理している状態で、consumer-1のAPIエンドポイントを叩いて、segment番号が1であるsegmentを手放すように指示します。

$ curl -X DELETE localhost:8081/segments/1                                             
{"body":"success"}                    

すると、segment 1 が、consumer-1から手放されて、consumer-2にアサインされたことが、ログから確認できました。

consumer-1-1  | INFO 1 --- [mer.listener]-0] o.a.e.k.e.consumer.FetchEventsTask       : Closing down FetchEventsTask using Consumer [org.apache.kafka.clients.consumer.KafkaConsumer@34440d44]
consumer-1-1  | INFO 1 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Released claim
consumer-1-1  | INFO 1 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Worker for segment Segment[1/1] stopped.
consumer-1-1  | INFO 1 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : No Worker Launcher active. Using current thread to assign segments.

consumer-2-1  | INFO 1 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Worker assigned to segment Segment[1/1] for processing
consumer-2-1  | INFO 1 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Dispatching new tracking segment worker: TrackingSegmentWorker{processor=org.example.axonkafkatrial.consumer.listener, segment=Segment[1/1]}
consumer-2-1  | INFO 1 --- [mer.listener]-1] o.a.e.TrackingEventProcessor             : Fetched token: KafkaTrackingToken{positions={doc-topic-0=3}} for segment: Segment[1/1]
consumer-2-1  | INFO 1 --- [mer.listener]-1] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
consumer-2-1  |         allow.auto.create.topics = true
consumer-2-1  |         〜省略〜
consumer-2-1  | 
consumer-2-1  | INFO 1 --- [mer.listener]-1] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
consumer-2-1  | INFO 1 --- [mer.listener]-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.7.1
consumer-2-1  | INFO 1 --- [mer.listener]-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: e2494e6ffb89f828
consumer-2-1  | INFO 1 --- [mer.listener]-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1736476913561
consumer-2-1  | INFO 1 --- [mer.listener]-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=axon-kafka-trial-consumer-43b26b95-e756-4fbf-a4ed-3b5323046981, groupId=null] Cluster ID: R6P-yxA8S56kRUGAKM8mRQ
consumer-2-1  | INFO 1 --- [mer.listener]-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=axon-kafka-trial-consumer-43b26b95-e756-4fbf-a4ed-3b5323046981, groupId=null] Assigned to partition(s): doc-topic-0
consumer-2-1  | INFO 1 --- [mer.listener]-1] o.a.e.k.e.consumer.ConsumerSeekUtil      : Seeking topic-partition [doc-topic-0] with offset [4]
consumer-2-1  | INFO 1 --- [mer.listener]-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=axon-kafka-trial-consumer-43b26b95-e756-4fbf-a4ed-3b5323046981, groupId=null] Seeking to offset 4 for partition doc-topic-0

token_entryテーブルを見ると、segment番号が1であるレコードのownerカラムが別の値(consumer-2の値)に変わっていました。

この状態でイベントを複数送信すると、consumer-1consumer-2で分散処理されるようになりました。

consumer-1-1  | INFO 1 --- [mer.listener]-1] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=d1f2a193-dc7a-4ae1-9033-2fb73aa85b20, body=test)
consumer-2-1  | INFO 1 --- [mer.listener]-1] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=a1078e93-9c99-456b-9ad0-1c70f22415e3, body=test)
consumer-1-1  | INFO 1 --- [mer.listener]-1] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=1a339fcb-1bf5-4f80-8ec8-6ee2866c53fe, body=test)
consumer-2-1  | INFO 1 --- [mer.listener]-1] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=e2abc2c9-a842-4b72-9cf3-1c4868256725, body=test)
consumer-1-1  | INFO 1 --- [mer.listener]-1] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=a09eedcf-7797-4509-a767-1641927d3e4b, body=test)

自動で負荷分散されるようにする

releaseSegmentを実行すれば、segmentの複数プロセスでの分担をコントロールできることはわかりました。
ただ、実運用として、Consumerアプリケーションの起動・停止や台数変更のたびに都度手動でreleaseSegmentを実行してリバランスさせるのは辛いので、自動で負荷分散されるようにしたいです。

そこで、あまり良い方法とは思えないものの、releaseSegmentを定期的に自動実行するようにしました。

今回は、SpringBootの@Scheduledアノテーションを利用して1分間隔で定期実行しています。

consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/service/StreamingProcessorService.kt
@Component
class StreamingProcessorService(private val config: EventProcessingConfiguration) {
    private val logger = LoggerFactory.getLogger(this.javaClass)

    private val processingGroup = "org.example.axonkafkatrial.consumer.listener"
    
    // 〜省略〜

    fun releaseSegmentFor(segmentId: Int) {
        config.eventProcessor(processingGroup, StreamingEventProcessor::class.java)
            .ifPresent { streamingProcessor -> streamingProcessor.releaseSegment(segmentId) }
    }

    // 以下を追加
    @Value("\${application.axon.minimumSegmentCount}")
    private val minimumSegmentCount: Int = 1

    @Scheduled(fixedDelayString = "60000")
    fun releaseSegment() {
        config.eventProcessor(processingGroup, StreamingEventProcessor::class.java)
            .ifPresent { streamingProcessor ->
                logger.info("Releasing segment for processor $processingGroup")
                val statusMap = streamingProcessor.processingStatus()
                if(statusMap.size > minimumSegmentCount) {
                    statusMap
                        .entries.drop(minimumSegmentCount).associate { it.toPair() }
                        .forEach { (segmentId, status) ->
                            logger.info("Segment $segmentId status: $status")
                            streamingProcessor.releaseSegment(segmentId)
                        }
                } else {
                    logger.info("No segment to release")
                }
            }
    }
}
consumer/src/main/resources/application.yml
# 〜省略〜
application:
  kafka:
    topics: doc-topic, default-topic
  # 以下を追加
  axon:
    minimumSegmentCount: 1

releaseSegmentまわりの実装内容について説明します。

まず、1つのConsumerプロセスが処理する最小segment数をapplication.ymlで設定できるようにしています。そして、合計segment数と起動する想定のプロセス数に応じて、最小segment数の値を設定します。

例:segment数が2で、プロセス数が2なら、最小segment数を 1 (= 2÷2) にする

この例の場合は、1プロセスしか起動していない状態では、2つのsegmentを1つのプロセスで処理しますが、最小segment数より多いsegmentを処理している場合は、定期的に余剰分のsegmentをreleaseするようにします。

このとき、もう1つのプロセスが起動していれば、releaseされたsegmentをその別プロセスが処理するようになります。

上記のコードで期待通り動作することを確認しました。

# 最初は2台起動しているが、先に起動したconsumer-2がsegmentを2つとも処理している状態

# 約1分経過後

# consumer-1は、まだsegmentを1つも処理していないので、何もreleaseしない
consumer-1-1  | INFO 1 --- [   scheduling-1] o.e.a.c.s.StreamingProcessorService      : Releasing segment for processor org.example.axonkafkatrial.consumer.listener
consumer-1-1  | INFO 1 --- [   scheduling-1] o.e.a.c.s.StreamingProcessorService      : No segment to release

# consumer-2は、mimnimumSegmentCount=1より多い2つのsegmentを処理しているので、そのうち1つをreleaseする
consumer-2-1  | INFO 1 --- [   scheduling-1] o.e.a.c.s.StreamingProcessorService      : Releasing segment for processor org.example.axonkafkatrial.consumer.listener
consumer-2-1  | INFO 1 --- [   scheduling-1] o.e.a.c.s.StreamingProcessorService      : Segment 1 status: TrackerStatus{segment=Segment[1/1], caughtUp=true, replaying=false, merging=false, errorState=false, error=null, trackingToken=KafkaTrackingToken{positions={doc-topic-0=10}}, currentPosition=OptionalLong.empty, resetPosition=OptionalLong.empty, mergeCompletedPosition=OptionalLong.empty}
consumer-2-1  | INFO 1 --- [mer.listener]-0] o.a.e.k.e.consumer.FetchEventsTask       : Closing down FetchEventsTask using Consumer [org.apache.kafka.clients.consumer.KafkaConsumer@6662fc60]
consumer-2-1  | INFO 1 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Released claim
consumer-2-1  | INFO 1 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Worker for segment Segment[1/1] stopped.
consumer-2-1  | INFO 1 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : No Worker Launcher active. Using current thread to assign segments.

# consumer-1が、consumer-2がreleaseしたsegmentを処理する
consumer-1-1  | INFO 1 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Worker assigned to segment Segment[1/1] for processing
consumer-1-1  | INFO 1 --- [mer.listener]-0] o.a.e.TrackingEventProcessor             : Dispatching new tracking segment worker: TrackingSegmentWorker{processor=org.example.axonkafkatrial.consumer.listener, segment=Segment[1/1]}
consumer-1-1  | INFO 1 --- [mer.listener]-1] o.a.e.TrackingEventProcessor             : Fetched token: KafkaTrackingToken{positions={doc-topic-0=10}} for segment: Segment[1/1]
consumer-1-1  | INFO 1 --- [mer.listener]-1] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
consumer-1-1  |         allow.auto.create.topics = true
consumer-1-1  |         〜省略〜
consumer-1-1  | 
consumer-1-1  | INFO 1 --- [mer.listener]-1] o.a.k.c.t.i.KafkaMetricsCollector        : initializing Kafka metrics collector
consumer-1-1  | INFO 1 --- [mer.listener]-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.7.1
consumer-1-1  | INFO 1 --- [mer.listener]-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: e2494e6ffb89f828
consumer-1-1  | INFO 1 --- [mer.listener]-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1736482634116
consumer-1-1  | INFO 1 --- [mer.listener]-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=axon-kafka-trial-consumer-ba832afb-0190-497e-a899-12c3a2dcb843, groupId=null] Cluster ID: R6P-yxA8S56kRUGAKM8mRQ
consumer-1-1  | INFO 1 --- [mer.listener]-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=axon-kafka-trial-consumer-ba832afb-0190-497e-a899-12c3a2dcb843, groupId=null] Assigned to partition(s): doc-topic-0
consumer-1-1  | INFO 1 --- [mer.listener]-1] o.a.e.k.e.consumer.ConsumerSeekUtil      : Seeking topic-partition [doc-topic-0] with offset [11]
consumer-1-1  | INFO 1 --- [mer.listener]-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=axon-kafka-trial-consumer-ba832afb-0190-497e-a899-12c3a2dcb843, groupId=null] Seeking to offset 11 for partition doc-topic-0

# consumer-1とconsumer-2で分散処理している
consumer-1-1  | INFO 1 --- [mer.listener]-1] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=5489db2b-7078-4626-8d31-6809044ed350, body=test)
consumer-2-1  | INFO 1 --- [mer.listener]-1] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=c749859e-deae-470b-8269-c915ce809d34, body=test)

# さらに1分経過後

# consumer-1、consumer-2とも、mimnimumSegmentCount=1と同じ1つのsegmentを処理しているので、何もreleaseしない
consumer-2-1  | INFO 1 --- [   scheduling-1] o.e.a.c.s.StreamingProcessorService      : Releasing segment for processor org.example.axonkafkatrial.consumer.listener
consumer-2-1  | INFO 1 --- [   scheduling-1] o.e.a.c.s.StreamingProcessorService      : No segment to release
consumer-1-1  | INFO 1 --- [   scheduling-1] o.e.a.c.s.StreamingProcessorService      : Releasing segment for processor org.example.axonkafkatrial.consumer.listener
consumer-1-1  | INFO 1 --- [   scheduling-1] o.e.a.c.s.StreamingProcessorService      : No segment to release

ただし、この方法には課題もあります。

releaseSegmentしてから、別プロセスが処理しはじめる(別プロセスがいない場合は自プロセスが改めて処理しはじめる)までの数秒間は、そのsegmentの処理が一時停止することになります。

指定したsegment数とプロセス数で期待通りに分散された状態になれば安定しますが、偏った状態(特定のプロセスが最小segment数より多くのsegmentを処理している状態)だと、定期的にreleaseSegmentが実行されて、そのたびに処理が遅延してしまう可能性があります。

また、プロセス数に応じて全体のsegment数および1プロセスあたりの最小segment数の値を調整する必要があるので、AutoScalingに対応させるのは難しくなります。

そのため、可能であれば、Consumerとしての処理は、ロードバランサーで分散処理するAPIサーバのプロセスとは相乗りせずに、独立したプロセスとして動作させた方が、リソースのコントロールはしやすいかもしれません。

subscribingモードでの動作確認

次に、subscribingモードでの挙動について確認していきます。

まずはsubscribingモードになるように設定を変更します。

consumer/src/main/resources/application.yml
# 〜省略〜
axon:
  axonserver:
    enabled: false
  serializer:
    general: jackson
    events: jackson
  kafka:
    client-id: axon-kafka-trial-consumer-${random.uuid}
    bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9094}
    consumer:
#      event-processor-mode: tracking
      event-processor-mode: subscribing # こっちに変更
#    properties:
#      auto.offset.reset: earliest
    default-topic: doc-topic
consumer/src/main/kotlin/org/example/axonkafkatrial/consumer/service/StreamingProcessorService.kt
@Component
class StreamingProcessorService(private val config: EventProcessingConfiguration) {
    private val logger = LoggerFactory.getLogger(this.javaClass)

    private val processingGroup = "org.example.axonkafkatrial.consumer.listener"
    
    // 〜省略〜

    @Value("\${application.axon.minimumSegmentCount}")
    private val minimumSegmentCount: Int = 1

		// 定期的なreleaseSegment実行は無効化しておく
//    @Scheduled(fixedDelayString = "60000")
    fun releaseSegment() {
        config.eventProcessor(processingGroup, StreamingEventProcessor::class.java)
            .ifPresent { streamingProcessor ->
                logger.info("Releasing segment for processor $processingGroup")
                val statusMap = streamingProcessor.processingStatus()
                if(statusMap.size > minimumSegmentCount) {
                    statusMap
                        .entries.drop(minimumSegmentCount).associate { it.toPair() }
                        .forEach { (segmentId, status) ->
                            logger.info("Segment $segmentId status: $status")
                            streamingProcessor.releaseSegment(segmentId)
                        }
                } else {
                    logger.info("No segment to release")
                }
            }
    }
}

この状態で改めて、consumer-1consumer-2を起動します。

2つのプロセスが、同じConsumerGroup(groupId=axon-kafka-trial-consumer-group)で別のクライアント(clientId=axon-kafka-trial-consumer-<UUID>)としてそれぞれトピックをsubscribeしていることがログから確認できます。

consumer-1-1  | INFO 1 --- [           main] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=axon-kafka-trial-consumer-8bb91829-c7ef-418f-94ed-700612607579, groupId=axon-kafka-trial-consumer-group] Subscribed to topic(s): doc-topic, default-topic

consumer-2-1  | INFO 1 --- [           main] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=axon-kafka-trial-consumer-da34df45-c5e2-4eba-8706-26d17ee16bbb, groupId=axon-kafka-trial-consumer-group] Subscribed to topic(s): doc-topic, default-topic

しかし、kafka-uiでConsumersを確認すると、Membersは2になっているものの、consumer-1のConsumerIdしか表示されていません。

one-consumer-displayed.png

この状態でイベントを送信して動作確認すると、案の定ですが、consumer-1だけでイベントが処理されました

そもそも、subscribingモードではKafkaのConsumerGroupの仕組みを使用して動作しているのでした。そして、Kafkaではトピックのパーティション単位で負荷分散されます。

このときは、doc-topicのパーティション数が1になっていたので、Consumerを増やしても分散処理できないのは当たり前でした。

そこで、パーティション数を2に増やして試してみます。

ProducerアプリケーションとConsumerアプリケーションをいったん停止し、Kafkaのトピックの設定を行います。

トピックの設定を行うために、Kafka Clientのserviceをcompose.ymlに追加します。

compose.yml(抜粋)
  # 追加
  kafka-cli:
    image: confluentinc/cp-kafka
    entrypoint: [ '/bin/sh', '-c' ]
    depends_on:
      - kafka-0
    networks:
      - network0

Kafka Clientを使用してdoc-topicのパーティション数を2にして作り直します。

$ docker compose run --rm kafka-cli bash

$ kafka-topics --bootstrap-server kafka-0:9092 --list
__consumer_offsets
default-topic
doc-topic

$ kafka-topics --bootstrap-server kafka-0:9092 --delete --topic doc-topic

$ kafka-topics --bootstrap-server kafka-0:9092 --list
__consumer_offsets
default-topic

$ kafka-topics --bootstrap-server kafka-0:9092 --create --topic doc-topic --partitions 2
Created topic doc-topic.

$ kafka-topics --bootstrap-server kafka-0:9092 --list
__consumer_offsets
default-topic
doc-topic

$ exit

この状態でconsumer-1consumer-2を起動すると、Consumerが2つ表示されるようになりました。

two-consumer-displayed.png

そして、イベントのパーティション分散状況に合わせて、Consumerの処理も複数プロセスで分散されるようになりました

consumer-2-1  |  INFO 1 --- [ AsyncFetcher-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=251652a9-8787-4296-82d0-42b649ae47fd, body=test0)
consumer-2-1  |  INFO 1 --- [ AsyncFetcher-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=c1a032b4-6763-47ca-8b0a-cbaefb2cbc87, body=test2)
consumer-2-1  |  INFO 1 --- [ AsyncFetcher-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=1ebc9938-e43c-44dc-be39-b38c32d96056, body=test4)
consumer-1-1  |  INFO 1 --- [ AsyncFetcher-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=29657f7d-f7fa-4c2a-bf7e-3072fab7ba5a, body=test1)
consumer-1-1  |  INFO 1 --- [ AsyncFetcher-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=6150c5ea-259f-4529-919f-9171dc09f6a9, body=test3)
consumer-1-1  |  INFO 1 --- [ AsyncFetcher-0] o.e.a.consumer.listener.ConsumerProcess  : event received: DocCreated(docId=5a7f0064-37ad-48f9-9a35-c62bd6acfb2b, body=test5)

subscribingモードでは、Kafkaが提供する負荷分散の仕組みをそのまま使用するため、Axon側で制御が必要となるtrackingモードよりも簡単に負荷分散を実現することができました。

まとめ

Axonを利用して実装したKafkaのConsumerについて、trackingモードおよびsubscribingモードで、複数プロセスで負荷分散する方法を確認しました。プロセスを複数ノードで起動することで、複数ノードでの負荷分散ができると思われます。

trackingモードの場合は、複数のsegmentの処理を複数のプロセスで分担するために、segmentのsplit/mergeやreleaseの制御をプログラムで行う必要があることがわかりました。

subscribingモードの場合は、KafkaのトピックパーティションとConsumerGroupの仕組みによって、負荷分散を行うことができることがわかりました。

単に実装量の観点ではsubscribingモードの方が容易に負荷分散ができますが、subscribingモードではAxonが提供する機能の一部を利用できない制約もあるため、負荷分散以外の要件も合わせてトータルで判断して選択する必要がありそうです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?