13
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Axon Frameworkを使ってみる

Posted at

たまたま社内の業務フローを実装する機会を得ました。申請→関係者が承認→最終責任者が承認するみたいなやつです。

普通にSpringのMVCでCRUDアーキテクチャでも開発できるのですが、せっかくなので前々から気になっていたCQRS+Event Sourcingなアーキテクチャにしたく、それを実現するAxon Frameworkを評価してみようと思います。

Axon Framework

  • CQRSフレームワーク。
  • Event Sourcing機能も提供。
    • Event Sourcingは必須ではない。
  • Spring Bootとの連携機能も提供。
    • でもspring-boot-devtoolsは相性が悪くて使えず、ちょっと開発効率が悪い。
  • この記事を書いている時点の最新バージョンは4.3。
  • Apache License v2.0

なお、バージョン4からAxon ServerというMicroservices環境向けのサーバが追加され、Axon ServerとAxon Frameworkで連携動作することが推奨されているようですが、今回はAxon Frameworkを理解したいので扱いません。

アーキテクチャ

以下はバージョン3のドキュメントの図で、最新の図より判りやすい(と思う)。

image.png

  • 開発者はCommand HandlerとEvent Handler、Thin Data Layerを実装する。UIはAxonと関係ないので割愛。
  • Command Handlerでビジネスロジックに応じたイベントを送信すると、AxonがイベントをEvent Storeに記録し、イベントに対応するEvent Handlerを呼び出す。
  • 図下のEvent Handlerは問合せ用のデータを生成する。どんなデータストアを利用するかは任意。

処理シーケンス

Axonのドキュメントやサンプルから理解したイメージです。

コマンド

コマンド・シーケンス

  1. Axonが提供するCommand Busを介して、Aggregateにコマンドを送信。
  2. コマンドに該当するAggregateのインスタンスが生成される。もし該当するAggregateのイベントが既に記録さていれば、インスタンスにイベントが適用(再生)される。
  3. Aggregateがコマンドを受信し、Aggregateの状態に応じたイベントをEvent Busに送信。
  4. Event Busがイベントを記録し、Aggregateにイベントを送信。
  5. Aggregateはイベントに応じた内容に自身の状態を変更。

クエリ

クエリ・シーケンス

  1. イベントがEvent Storeに記録されたのをEvent Processorが検知して、イベントをThin Data Layerに送信する。
  2. Thin Data Layerはクエリ処理がしやすい形でデータを保存する。
  3. クエリはQuery Busを介してThin Data Layerに送信される。
  4. Thin Data LayerはData Storeからデータを取得して返却する。

サンプル

まずは評価用に申請をCRUDするだけの処理を実装してみます。

1. 依存関係

Spring Bootアプリケーションにaxon-spring-boot-starter(①)を追加します。今回はAxon Serverを使わないので、axon-server-connectorは対象外にしておきます。そうしないとアプリケーション実行中にAxon Serverへの接続失敗のエラーや警告が出力されます。

Axon FrameworkはEvent StoreにアクセスするエンジンとしてJPAとJDBCが提供されています。Event Storeへの処理はAxon Frameworkが隠蔽するので、どちらを選ぶかはThin Data Layerの実装方法との兼ね合いになると思います。
今回は、設定とかが最小限の手間で済むJPAエンジンにし、spring-boot-starter-data-jpa(②)を追加しています。

  • 実開発では個人的に気に入っているJooqを使いたいので、その場合はJDBCエンジンを使うつもりです。
pom.xml(抜粋)
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.6.RELEASE</version>
    <relativePath/>
  </parent>

  <properties>
    <java.version>11</java.version>
    <kotlin.version>1.3.71</kotlin.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.axonframework</groupId>
      <artifactId>axon-spring-boot-starter</artifactId> <!-- ① -->
      <version>4.3.2</version>
      <exclusions>
        <exclusion>
          <groupId>org.axonframework</groupId>
          <artifactId>axon-server-connector</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <!-- Web -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-thymeleaf</artifactId>
    </dependency>

    <!-- DB -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-jpa</artifactId> <!-- ② -->
    </dependency>
    <dependency>
      <groupId>com.h2database</groupId>
      <artifactId>h2</artifactId>
      <scope>runtime</scope>
    </dependency>

2. Aggregate

Axon Frameworkで業務ロジックの中心となる部分です。
DDDでAggregateはトランザクション境界になるとあるので、Aggregate単位でイベントを管理するのでしょう。

2.1. コマンドとイベント

コマンドには、送信対象となるAggregateの識別子を、@TargetAggregateIdentifier(①)が付与されていたプロパティで指定します。
一方、イベントはコマンドを受信したのと同じAggregateインスタンスに送信されるか、インスタンスに依らないThin Data Layerに送信されるので、そのような指定はありません。

message.kt
interface IInvoiceBody {
  val invoiceTitle: String
}

interface IInvoiceHeader {
  val invoiceId: UUID
}

interface IInvoice: IInvoiceHeader, IInvoiceBody

data class CreateInvoiceCommand(
  @field:TargetAggregateIdentifier // ①
  override val invoiceId: UUID,
  val body: IInvoiceBody): IInvoiceBody by body, IInvoice

data class UpdateInvoiceCommand(
  @field:TargetAggregateIdentifier // ①
  override val invoiceId: UUID,
  val body: IInvoiceBody): IInvoiceBody by body, IInvoice

data class RemoveInvoiceCommand(
  @field:TargetAggregateIdentifier // ①
  override val invoiceId: UUID
): IInvoiceHeader

data class InvoiceCreatedEvent(private val invoice: IInvoice): IInvoice by invoice

data class InvoiceUpdatedEvent(private val invoice: IInvoice): IInvoice by invoice

data class InvoiceRemovedEvent(
  override val invoiceId: UUID
): IInvoiceHeader
  • なお、インタフェースとデレゲートを多用しているは、コマンドからイベントへプロパティ値の移し替えを端折りたいためで、こうしないと動かないって訳ではないです。

2.2. Aggregate

コマンドの@TargetAggregateIdentifierに対応するプロパティには@AggregateIdentifier(①)を付与します。
@Aggregate(②)はSpring連携用のアノテーションで、@Componentアノテーションの働きも持っています。

コマンドを処理するメソッドには@CommandHandler(③)を付与します。
なお、Aggregateを新規作成するコマンドはコンストラクタで受信します。しかし、その後のコマンドを処理する場合は、インスタンス生成後にイベントを再生するだけなので、デフォルトコンストラクタも必須です。

イベントは@EventSourcingHandler(④)を付与したメソッドで受信して、Aggregateの状態を更新します。Aggregateの寿命が終わるイベントの時にはAggregateLifecycle.markDeleted()を呼び出します。

InvoiceAggregate.kt
@Aggregate // ②
class InvoiceAggregate(): IInvoice {
  @AggregateIdentifier // ①
  override lateinit var invoiceId: UUID
  override lateinit var invoiceTitle: String

  @CommandHandler // ③
  constructor(command: CreateInvoiceCommand): this() {
    AggregateLifecycle.apply(InvoiceCreatedEvent(command))
  }

  @EventSourcingHandler // ④
  fun on(event: InvoiceCreatedEvent) {
    invoiceId = event.invoiceId
    invoiceTitle = event.invoiceTitle
  }

  @CommandHandler // ③
  fun handle(command: UpdateInvoiceCommand) {
    AggregateLifecycle.apply(InvoiceUpdatedEvent(command))
  }

  @EventSourcingHandler // ④
  fun on(event: InvoiceUpdatedEvent) {
    invoiceTitle = event.invoiceTitle
  }

  @CommandHandler // ③
  fun handle(command: RemoveInvoiceCommand) {
    AggregateLifecycle.apply(InvoiceRemovedEvent(command.invoiceId))
  }

  @EventSourcingHandler // ④
  fun on(event: InvoiceRemovedEvent) {
    AggregateLifecycle.markDeleted() //⑤
  }
}

3. Thin Data Layer

Event Storeに記録されたイベントは、@EventHandler(①)が付与されたメソッドに配信されます。@EventSourcingHandlerとは違うので注意してください。
クエリは@QueryHandler(②)が付与されたメソッドで受信して、条件に該当するデータを取得して、返却します。

InvoiceService.kt
@Service
class InvoiceService(val invoiceRepo: InvoiceRepository) {
  class AllInvoicesQuery

  @QueryHandler // ②
  fun handle(query: AllInvoicesQuery): List<InvoiceEntity> {
    return invoiceRepo.findAll()
  }
  @EventHandler // ①
  fun on(event: InvoiceCreatedEvent) {
    invoiceRepo.save(InvoiceEntity(event.invoiceId, event.invoiceTitle))
  }
  @EventHandler // ①
  fun on(event: InvoiceUpdatedEvent) {
    invoiceRepo.save(InvoiceEntity(event.invoiceId, event.invoiceTitle))
  }
  @EventHandler // ①
  fun on(event: InvoiceRemovedEvent) {
    invoiceRepo.deleteById(event.invoiceId)
  }
}

4. Controller

これはSpring MVCにおけるControllerで、HTTPリクエストをコマンドに変換し、CommandGatewayやQueryGatewayを使って、Command BusやQuery Busに送信します。
CommandGatewayへのコマンド送信は、非同期のsendメソッドと、同期のsendAndWaitメソッドがあります。ここでは、PRGパターンにしてますが追い越しがあるかもしれないので、sendAndWaitを使用しています。

InvoiceController.kt
@Controller
class InvoiceController(val commandGateway: CommandGateway, val queryGateway: QueryGateway) {

  companion object {
    const val REDIRECT_URL = "${UrlBasedViewResolver.REDIRECT_URL_PREFIX}/"
  }

  data class InvoiceBody(override val invoiceTitle: String): IInvoiceBody
  data class InvoiceRequest(
    override val invoiceId: UUID,
    override val invoiceTitle: String
  ): IInvoiceHeader, IInvoiceBody

  @GetMapping("/")
  fun topPage(model: Model): String {
    val invoices = queryGateway.query(InvoiceService.AllInvoicesQuery(), MultipleInstancesResponseType(InvoiceEntity::class.java)).get()
    model.addAttribute("invoices", invoices)
    return "index"
  }

  @PostMapping("/invoices")
  fun createInvoice(invoice: InvoiceBody): String {
    commandGateway.sendAndWait<Any>(CreateInvoiceCommand(UUID.randomUUID(), invoice))
    return REDIRECT_URL
  }

  @PostMapping("/invoices/update")
  fun updateInvoice(invoice: InvoiceRequest): String {
    commandGateway.sendAndWait<Any>(UpdateInvoiceCommand(invoice.invoiceId, invoice))
    return REDIRECT_URL
  }

  @PostMapping("/invoices/delete")
  fun deleteInvoice(@RequestParam invoiceId: UUID): String {
    commandGateway.sendAndWait<Any>(RemoveInvoiceCommand(invoiceId))
    return REDIRECT_URL
  }
}

トラブル

コマンドの結果がクエリで見れない

例えばサンプルで、申請の作成(/invoices)を行って、リダイレクトしてページの表示(/)を行っても、データが取得できませんでした。
デバッグしてみると、@CommandHandlerのメソッドがイベントを送信して、@EventSourcingHandlerのメソッドは直ぐに呼び出されるのですが、@EventHandlerのメソッドは少し後でした。
どうやらAxonのデフォルトのEvent ProcessorであるTrackingEventProcessorは独立したスレッドで動作しており、デフォルト値の5秒周期でイベントの追加を監視しているようです。

おそらく大規模システム向けには監視周期をチューニングするのが得策でしょうが、小規模ならもう1つのSubscribingEventProcessorに変更した方が良さそうです。

@SpringBootApplication
class SampleAxonApplication {
  @Autowired
  fun configure(configurer: EventProcessingConfigurer) {
    configurer.usingSubscribingEventProcessors()
  }
}

SubscribingEventProcessorだと、Controllerを同じスレッドから@EventHandlerのメソッドが呼び出されるで、クエリが追い越すことが回避できます。

所感

まだ色々と調査が足りていないので、実開発に耐えられるかは、まだ不明なところがあります。
例えば、

  • 仕様変更でAggregateの実装を変更する時、既にイベントは記録されているので、どのように対応すれば良いのか?
  • バグで誤ったイベントが記録されている場合に、イベントを修正したり削除できるのか?
  • 操作履歴が欲しいのだけど、イベントの一覧を取得できる仕組みは用意されているのか?
  • Aggregate間で、例えば申請書名みたいに一意性を確保したい場合は、Controllerからコマンドを送信する前にData Storeでチェックするそうですが、それだけで対応ができるのか、ちょっと不安。

などなどです。
これらは、今後、調べて行きたいと思います。

13
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?