0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Axon FrameworkでKafkaの指定したトピックにイベントを送信する [Kotlin, SpringBoot]

Posted at

はじめに

Axon Framework(以降、Axonと記述)では、Kafkaにイベントデータを送信する際、デフォルトでは Axon.EventsというKafkaトピックに送信されます。

明示的に送信先トピックを指定するための設定方法を確認しました。

なお、確認する際に使用したアプリケーションについては、以下の記事でセットアップしたものを使用しているので、詳細はそちらをご参照ください。

全てのイベントを1つのトピックに送信する場合

この場合は、application.ymlでdefault-topicを指定するだけで実現できました。

src/main/resources/application.yml
axon:
  axonserver:
    enabled: false
  kafka:
    bootstrap-servers: localhost:9094
	  default-topic: doc-topic # 追加
  serializer:
    events: jackson

これで、doc-topicに送信されました。

sent-to-doc-topic.png

Axonのドキュメントでは、以下に設定例が記載されています。

イベントの種類ごとに送信先トピックを変えたい場合

この場合は、TopicResolverを設定する必要がありました。
Axonのドキュメントの関連箇所は以下です。

以下のようなConfigクラスを作成し、eventMessage.payloadTypeで取得したイベントデータ型をもとに、送信先トピックを分岐させています。

src/main/kotlin/org/example/axonkafkatrial/producer/config/AxonKafkaProducerConfig.kt
package org.example.axonkafkatrial.producer.config

import org.axonframework.extensions.kafka.eventhandling.producer.TopicResolver
import org.example.axonkafkatrial.producer.service.DocCreated
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.*

@Configuration
class AxonKafkaProducerConfig {
    @Bean
    fun topicResolver(): TopicResolver =
        TopicResolver { eventMessage ->
            when (eventMessage.payloadType) {
                DocCreated::class.java -> Optional.of("doc-topic")
                else -> Optional.of("default-topic")
            }
        }
}

送信先トピックの振り分けロジックは、上記のようにTopicResolverの実装として自由にプログラム可能なので、イベントデータ型以外を条件にするなど、柔軟にカスタマイズが可能そうです。

動作確認のため、DocCreated以外のイベントを発生させる/otherAPIエンドポイントを用意しました。

src/main/kotlin/org/example/axonkafkatrial/producer/controller/TrialController.kt
package org.example.axonkafkatrial.producer.controller

import org.axonframework.commandhandling.gateway.CommandGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.example.axonkafkatrial.producer.command.OtherCreate
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.*
import java.util.*

@RestController
@RequestMapping("/")
class TrialController(private val commandGateway: CommandGateway) {
    @GetMapping
    @ResponseStatus(HttpStatus.OK)
    fun get(): GetResponse {
        return GetResponse("success")
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    fun post(@RequestBody req: PostRequest): PostResponse {
        val id = UUID.randomUUID()
        val command = DocCreate(id, req.body)
        commandGateway.sendAndWait<DocCreate>(command)
        return PostResponse(id)
    }

    // 追加
    @PostMapping("/other")
    @ResponseStatus(HttpStatus.CREATED)
    fun postOther(@RequestBody req: PostRequest): PostResponse {
        val id = UUID.randomUUID()
        val command = OtherCreate(id, req.body)
        commandGateway.sendAndWait<OtherCreate>(command)
        return PostResponse(id)
    }
}

data class GetResponse(val body: String)
data class PostRequest(val body: String)
data class PostResponse(val id: UUID)
src/main/kotlin/org/example/axonkafkatrial/producer/service/TrialService.kt
package org.example.axonkafkatrial.producer.service

import org.axonframework.commandhandling.CommandHandler
import org.axonframework.eventhandling.gateway.EventGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.example.axonkafkatrial.producer.command.OtherCreate
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.util.UUID

@Service
class TrialService(private val eventGateway: EventGateway) {
    private val logger = LoggerFactory.getLogger(this.javaClass)

    @CommandHandler
    fun run(command: DocCreate) {
        logger.info("command accepted: $command")
        val event = DocCreated(command.docId, command.body)
        eventGateway.publish(event)
    }

    // 追加
    @CommandHandler
    fun run(command: OtherCreate) {
        logger.info("command accepted: $command")
        val event = OtherCreated(command.id, command.body)
        eventGateway.publish(event)
    }
}

data class DocCreated(
    val docId: UUID,
    val body: String,
)

// 追加
data class OtherCreated(
    val id: UUID,
    val body: String,
)

APIを実行し、DocCreatedおよびOtherCreatedのイベントを発生させます。

$ curl -X POST localhost:8080 \
> -H 'Content-Type: application/json' \
> -d '{"body":"test"}'
{"id":"23587c51-17c8-401c-b6df-bcb0936cb138"}

$ curl -X POST localhost:8080/other \
> -H 'Content-Type: application/json' \
> -d '{"body":"test"}'
{"id":"0127d975-99e6-459a-bb66-21a831cac67c"}

DocCreatedのイベントはdoc-topicに送信されました。

sent-to-doc-topic2.png

OtherCreatedのイベントはdefault-topicに送信されました

sent-to-default-topic.png

まとめ

AxonからKafkaにイベントデータを送信する際に送信先トピックを指定する方法を確認しました。

Spring Bootを使用している場合、単一のトピックに送信するのであればapplication.ymlの設定だけでよく、送信先トピックを振り分けたい場合はTopicResolverを実装すればよい、ということがわかりました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?