はじめに
Axon Framework(以降、Axonと記述)では、Kafkaにイベントデータを送信する際、デフォルトでは Axon.Events
というKafkaトピックに送信されます。
明示的に送信先トピックを指定するための設定方法を確認しました。
なお、確認する際に使用したアプリケーションについては、以下の記事でセットアップしたものを使用しているので、詳細はそちらをご参照ください。
全てのイベントを1つのトピックに送信する場合
この場合は、application.ymlでdefault-topic
を指定するだけで実現できました。
axon:
axonserver:
enabled: false
kafka:
bootstrap-servers: localhost:9094
default-topic: doc-topic # 追加
serializer:
events: jackson
これで、doc-topic
に送信されました。
Axonのドキュメントでは、以下に設定例が記載されています。
イベントの種類ごとに送信先トピックを変えたい場合
この場合は、TopicResolver
を設定する必要がありました。
Axonのドキュメントの関連箇所は以下です。
以下のようなConfigクラスを作成し、eventMessage.payloadType
で取得したイベントデータ型をもとに、送信先トピックを分岐させています。
package org.example.axonkafkatrial.producer.config
import org.axonframework.extensions.kafka.eventhandling.producer.TopicResolver
import org.example.axonkafkatrial.producer.service.DocCreated
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.*
@Configuration
class AxonKafkaProducerConfig {
@Bean
fun topicResolver(): TopicResolver =
TopicResolver { eventMessage ->
when (eventMessage.payloadType) {
DocCreated::class.java -> Optional.of("doc-topic")
else -> Optional.of("default-topic")
}
}
}
送信先トピックの振り分けロジックは、上記のようにTopicResolver
の実装として自由にプログラム可能なので、イベントデータ型以外を条件にするなど、柔軟にカスタマイズが可能そうです。
動作確認のため、DocCreated
以外のイベントを発生させる/other
APIエンドポイントを用意しました。
package org.example.axonkafkatrial.producer.controller
import org.axonframework.commandhandling.gateway.CommandGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.example.axonkafkatrial.producer.command.OtherCreate
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.*
import java.util.*
@RestController
@RequestMapping("/")
class TrialController(private val commandGateway: CommandGateway) {
@GetMapping
@ResponseStatus(HttpStatus.OK)
fun get(): GetResponse {
return GetResponse("success")
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
fun post(@RequestBody req: PostRequest): PostResponse {
val id = UUID.randomUUID()
val command = DocCreate(id, req.body)
commandGateway.sendAndWait<DocCreate>(command)
return PostResponse(id)
}
// 追加
@PostMapping("/other")
@ResponseStatus(HttpStatus.CREATED)
fun postOther(@RequestBody req: PostRequest): PostResponse {
val id = UUID.randomUUID()
val command = OtherCreate(id, req.body)
commandGateway.sendAndWait<OtherCreate>(command)
return PostResponse(id)
}
}
data class GetResponse(val body: String)
data class PostRequest(val body: String)
data class PostResponse(val id: UUID)
package org.example.axonkafkatrial.producer.service
import org.axonframework.commandhandling.CommandHandler
import org.axonframework.eventhandling.gateway.EventGateway
import org.example.axonkafkatrial.producer.command.DocCreate
import org.example.axonkafkatrial.producer.command.OtherCreate
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.util.UUID
@Service
class TrialService(private val eventGateway: EventGateway) {
private val logger = LoggerFactory.getLogger(this.javaClass)
@CommandHandler
fun run(command: DocCreate) {
logger.info("command accepted: $command")
val event = DocCreated(command.docId, command.body)
eventGateway.publish(event)
}
// 追加
@CommandHandler
fun run(command: OtherCreate) {
logger.info("command accepted: $command")
val event = OtherCreated(command.id, command.body)
eventGateway.publish(event)
}
}
data class DocCreated(
val docId: UUID,
val body: String,
)
// 追加
data class OtherCreated(
val id: UUID,
val body: String,
)
APIを実行し、DocCreated
およびOtherCreated
のイベントを発生させます。
$ curl -X POST localhost:8080 \
> -H 'Content-Type: application/json' \
> -d '{"body":"test"}'
{"id":"23587c51-17c8-401c-b6df-bcb0936cb138"}
$ curl -X POST localhost:8080/other \
> -H 'Content-Type: application/json' \
> -d '{"body":"test"}'
{"id":"0127d975-99e6-459a-bb66-21a831cac67c"}
DocCreated
のイベントはdoc-topic
に送信されました。
OtherCreated
のイベントはdefault-topic
に送信されました
まとめ
AxonからKafkaにイベントデータを送信する際に送信先トピックを指定する方法を確認しました。
Spring Bootを使用している場合、単一のトピックに送信するのであればapplication.ymlの設定だけでよく、送信先トピックを振り分けたい場合はTopicResolverを実装すればよい、ということがわかりました。