2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ZOZOAdvent Calendar 2024

Day 8

Axon FrameworkでCQRSパターンを構築する:入門編

Last updated at Posted at 2024-12-07

はじめに

近年、マイクロサービスアーキテクチャやイベントドリブンなシステム設計が注目を集める中、CQRS(Command Query Responsibility Segregation)パターンが注目されています。

システムをCQRSパターンで構築する場合、自前で実装する方法とフレームワークを利用する方法がありますが、
フレームワークを利用する利点は、認知コストや開発工数を削減できるという点だと思います。
本ページではJava向けのAxon Frameworkを使用して、CQRSパターンでのシステム構築に入門してみようと思います。

CQRSの基本概念

CQRSの基本概念については、解説記事が数多くあるのでそちらを参照してください。
ここではMicrosoft社の記事を紹介しておきます。

またシステムへの適用事例としてZOZOTOWNで実際にCQRSを適用した事例もご紹介します。

私も実業務としてCQRSパターンを使用したシステム開発・運用に携わっていますが、システムの要件にハマりさえすれば強力な利点があると実感しています。

利点

コマンドとクエリ、それぞれに特化した設計や実装を行うことができる

  • 設計において、ビジネスロジックが詰まったコマンドのモデルと読み取り操作のみのモデルを分離することで、シンプルなモデル設計になる
  • クエリ側とコマンド側のDBをそれぞれの要件に適した選択が可能になる
    (コマンド側は同時書き込みにつよい分散データベース、クエリ側は複雑な検索やデータ整合性が求められるためRDBを使用するなど)

もちろんトレードオフとして以下のような課題も存在します

課題

  • アーキテクチャが複雑になり実装における開発工数/認知コストが増加
  • 読み取りの結果整合性を許容することになる
    など

が、開発工数/認知コストが増加については、冒頭に述べた通りフレームワークを使用することで緩和することができるのではないでしょうか。

Axon Frameworkの概要

Axon FrameworkはCQRS/EventSouring/DDDという3つのコアコンセプトを中心としたフレームワークです。
AxonIQによってホストされており、Apache License v2.0の下オープンソースで提供されています。
(イベントストア/イベントバスとして利用できるAxon Serverについては、AxonIQ特有のライセンスで提供されており、クラスタ構成で利用する場合は商用ライセンスでの提供となりますが、MySQLやKafkaなどを代用することも可能です)

Axon Frameworkの主要コンポーネント

Axon Frameworkは、CQRSと同時にイベントソーシングをコンセプトにしているため、システムの状態変更は全てイベントを通じて行われます。

下図はAxon Frameworkにおけるシステムの状態変更の概略図です。
イベントを中核とし三つのメッセージオブジェクトを通じて通信が行われていることがわかります。
参考:Axon Framework Reference / Messaging Concepts

  1. Command: アプリケーションの状態変更を意図したメッセージ
  2. Event: アプリケーション内で発生した出来事を記録するメッセージ
  3. Query: 情報や状態を要求するメッセージ

image.png

実際に実装してみる

Java関連の有益な記事を多く提供している海外サイトBaeldungのサンプルリポジトリを用いて実装してみます。

リポジトリクローン

上記リポジトリでは他チュートリアルリソースも含めてモノレポで提供しているので、
AxonFrameworkのチュートリアルだけgitのSparse Checkoutを使用してクローンします。

git clone --filter=blob:none --no-checkout https://github.com/eugenp/tutorials.git
cd tutorials
git sparse-checkout init --cone
git sparse-checkout set patterns-modules/axon
git sparse-checkout add parent-boot-3
git checkout master

cd patterns-modules/axon

動作確認

起動処理

そのまま起動して動作確認を行ってみます。
サンプルではイベントストアとしてAxon ServerをDockerで起動して使用しています。
まずはAxonServerの起動スクリプトを実行し、WEBGUIから有効化を行います。

chmod +x ./start_axon_server.sh & ./start_axon_server.sh 

起動後、http://localhost:8024/ にアクセスしてCompleteボタンを押して有効化
image.png

その後にSpringBoot Applicationを起動します

mvn clean install
mvn spring-boot:run
Orderの作成

APIを通じてOrderの取得をしてみます。クエリ側のall-ordersのエンドポイントを実行してみます。

curl -X 'GET' \                   
  'http://localhost:8080/all-orders' \               
  -H 'accept: application/json; charset=utf-8'

// レスポンスは空
[]%  

Orderが登録されていないため、空のレスポンスが帰ってきます。

では次に、コマンド側のエンドポイントorderを実行してOrderを登録します。

curl -X 'POST' \
  'http://localhost:8080/order' \     
  -H 'accept: application/json; charset=utf-8'
a6b9a6fe-939b-4823-ae12-d28adb52b2df%  

その後、先ほどと同じall-ordersのエンドポイントを使用すると、Orderが登録されていることがわかります。

curl -X 'GET' \
  'http://localhost:8080/all-orders' \
  -H 'accept: application/json; charset=utf-8' | jq

// Orderが登録されている
[
  {
    "orderId": "a6b9a6fe-939b-4823-ae12-d28adb52b2df",
    "products": {},
    "orderStatus": "CREATED"
  }
]

上記の流れを図で表現すると以下のような動きになっています。
image.png

コマンド側のエンドポイント/orderを実行されると図中の青枠部分までが同期的に実行されます。

  1. コントローラがコマンド型のメッセージ(CreateOrderCommand)を発火
  2. それを受け取ったCommandHandlerはOrderの作成を意味するイベント型のメッセージ(OrderCreatedEvent)を作成し、イベントストア(今回の例ではAxonServer)へ保存と同時にイベントバス(今回の例ではAxonServer)へ公開する

これはつまり、コマンド側のエンドポイントの責務はイベントをイベントストアとイベントバスへ公開する部分までになるということです。

イベントを元にクエリ側のリードDBを更新する部分は非同期的に行われます。
そのためクエリ側のエンドポイントの実行タイミングによっては、リードDBのアップデートが行われていないため、結果整合性の読み取りを許容する必要が出てきます。

実際にAxonServerのWEBGUIからイベントストアを確認すると、
OrderCreatedEventが保存されていることがわかります。
image.png

Orderの状態を更新する

では次にコマンド側のエンドポイントを利用して作成したOrderの状態を変更してみます。

// 対象のOrderをconfirm状態にする
curl -X 'POST' \
  'http://localhost:8080/order/a6b9a6fe-939b-4823-ae12-d28adb52b2df/confirm' \
  -H 'accept: application/json; charset=utf-8'

// 対象のOrderをshipped状態にする
curl -X 'POST' \
  'http://localhost:8080/order/a6b9a6fe-939b-4823-ae12-d28adb52b2df/ship' \
  -H 'accept: application/json; charset=utf-8'

クエリ側のエンドポイントを利用して状態をクエリすると対象のOrderのステータスが更新されていることが確認できます。

curl -X 'GET' \
  'http://localhost:8080/all-orders' \
  -H 'accept: application/json; charset=utf-8' | jq

// Orderのステータスが更新されている
 {
    "orderId": "a6b9a6fe-939b-4823-ae12-d28adb52b2df",
    "products": {},
    "orderStatus": "SHIPPED"
  }

先程と同様に見えますが、異なる箇所があります。
Order作成のおいては、新規にイベントを発行するだけでしたが、今回はすでに作成されたOrderの状態を変更します。
そのため、まずはイベントストアに保存された対象IDのOrderイベントを全て取得し、最新の状態に復元する必要があります。

イベントソーシングの基本となるこの動作ですが、この辺りの処理は全てAxonFrameworkが内部的に行ってくれます。

コードを確認する

上記の動作確認で行った処理について実際のコードを確認してみます。

まず前提として、Axon FrameworkではAggregateという単位でビジネスロジックを管理・処理します。
(AggregateはDDDの概念で、関連するエンティティや値オブジェクトを一つにまとめたもの)
今回の例においては、OrderはOrderAggregateクラスにて作成や更新処理が一元的に管理されています。

Orderの作成

コマンド側のエンドポイント/orderを実行すると以下のようにCreateOrderCommandが作成され、commandGateway.sendメソッドを通じて発火されています。
commandGatewayはAxonFrameworkのコンポーネントでコマンド型のメッセージを受けとり、適切なOrderAggregateクラスのインスタンスにルーティングしてくれます。
今回は作成コマンドなので、コンストラクタを呼び出してOrderAggregateのインスタンス作成が行われます。

src/main/java/com/baeldung/axon/gui/OrderRestEndpoint.java
  @PostMapping("/order")
    public CompletableFuture<String> createOrder() {
        return createOrder(UUID.randomUUID()
          .toString());
    }

    @PostMapping("/order/{order-id}")
    public CompletableFuture<String> createOrder(@PathVariable("order-id") String orderId) {
        return commandGateway.send(new CreateOrderCommand(orderId));
    }

CreateOrderCommandを元にAxonFrameworkが自動的に以下のコンストラクタを実行します。
今度はコンストラクタの処理としてapplyメソッドが実行されています。
このapplyメソッドによって、イベントストアへOrderCreatedEventが保存され同時にイベントバスへの公開が行われます。
(イベントストアとイベントバスへの書き込みの一貫性の担保をどのように実現しているかは読み取れませんでした。
おそらくどちらかが失敗すればロールバックされるものと思いますが。)

src/main/java/com/baeldung/axon/commandmodel/order/OrderAggregate.java
@Aggregate(snapshotTriggerDefinition = "orderAggregateSnapshotTriggerDefinition")
public class OrderAggregate {

    // 関連部分のみ記載
    
    @AggregateIdentifier
    private String orderId;
    private boolean orderConfirmed;

    @AggregateMember
    private Map<String, OrderLine> orderLines;

    @CommandHandler
    public OrderAggregate(CreateOrderCommand command) {
        apply(new OrderCreatedEvent(command.getOrderId()));
    }

}

ではイベントバスに公開された、OrderCreatedEventはどのようにリードDBに反映されるのでしょうか。

この部分はAxon Frameworkの概略図でいうEvent Handling Componentが担当します。
image.png

具体的には、@EventHandlerアノテーションを付与したメソッドがEvent Handling Componentとして自動的に登録され、指定されたイベントが公開されるとそのメソッドが自動的に実行されます。
OrderCreatedEventが発火すると、onメソッドが呼び出され、イベントから取得したorderIdを使用して新しいOrderオブジェクトを作成し、リードDB(この例ではインメモリ)に追加します。

src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java
@Service
@ProcessingGroup("orders")
@Profile("!mongo")
public class InMemoryOrdersEventHandler implements OrdersEventHandler {

    private final Map<String, Order> orders = new HashMap<>();
    private final QueryUpdateEmitter emitter;

    public InMemoryOrdersEventHandler(QueryUpdateEmitter emitter) {
        this.emitter = emitter;
    }

    @EventHandler
    public void on(OrderCreatedEvent event) {
        String orderId = event.getOrderId();
        orders.put(orderId, new Order(orderId));
    }
Orderの更新

次に、既存のOrderの状態を変更する部分を確認します。
Orderをconfirm状態およびshipped状態に更新する処理を見ていきます。
ここは新規作成と変わりありません。それぞれConfirmOrderCommandShipOrderCommandをcommandGateway.sendメソッドを通じて発火しています。

src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java
   @PostMapping("/order/{order-id}/confirm")
    public CompletableFuture<Void> confirmOrder(@PathVariable("order-id") String orderId) {
        return commandGateway.send(new ConfirmOrderCommand(orderId));
    }

    @PostMapping("/order/{order-id}/ship")
    public CompletableFuture<Void> shipOrder(@PathVariable("order-id") String orderId) {
        return commandGateway.send(new ShipOrderCommand(orderId));
    }

上記のコマンドがcommandGateway.sendメソッドを通じてAggregateに送られると、
Axon Frameworkはまず該当するAggregateのインスタンスを復元する必要があります。
このプロセスは以下のステップで行われます。

  1. イベントストアからのイベントの読み込み:
    AxonFrameworkは、対象のAggregateに関連するすべての過去のイベント(例えば、OrderCreatedEvent、OrderConfirmedEvent、OrderShippedEventなど)をイベントストアから読み込む。
  2. イベントの適用:
    読み込んだイベントを順次Aggregateに適用し自身の状態を最新の状態に更新します。
    イベント適用処理は@EventSourcingHandlerアノテーションを付与してメソッドで記載する。
  3. コマンドの処理:
    現在の状態が復元されたAggregateに対して、発行されたコマンドが処理され、
    新たなイベントが生成され、イベントストアに保存されると同時に、イベントバスを通じて公開されます。
src/main/java/com/baeldung/axon/commandmodel/order/OrderAggregate.java

@Aggregate(snapshotTriggerDefinition = "orderAggregateSnapshotTriggerDefinition")
public class OrderAggregate {

// 関連部分のみ記載

    @AggregateIdentifier
    private String orderId;
    private boolean orderConfirmed;

// ③ 現在の状態が復元されたAggregateに対して、発行されたコマンドが処理される
    @CommandHandler
    public void handle(ConfirmOrderCommand command) {
        if (orderConfirmed) {
            return;
        }

        apply(new OrderConfirmedEvent(orderId));
    }
    
// ③ 現在の状態が復元されたAggregateに対して、発行されたコマンドが処理される
    @CommandHandler
    public void handle(ShipOrderCommand command) {
        if (!orderConfirmed) {
            throw new UnconfirmedOrderException();
        }

        apply(new OrderShippedEvent(orderId));
    }

    
// ② 読み込んだイベントを順次適用して最新の状態に更新
    @EventSourcingHandler
    public void on(OrderCreatedEvent event) {
        this.orderId = event.getOrderId();
        this.orderConfirmed = false;
        this.orderLines = new HashMap<>();
    }
//  ② 読み込んだイベントを順次適用して最新の状態に更新
    @EventSourcingHandler
    public void on(OrderConfirmedEvent event) {
        this.orderConfirmed = true;
    }
}

イベントバスに公開された、OrderConfirmedEventOrderShippedEventOrderCreatedEventと同様にEvent Handling ComponentによってキャッチされリードDBへのupdateが行われます。

src/main/java/com/baeldung/axon/querymodel/InMemoryOrdersEventHandler.java
 @EventHandler
    public void on(OrderConfirmedEvent event) {
        orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
            order.setOrderConfirmed();
            emitUpdate(order);
            return order;
        });
    }

    @EventHandler
    public void on(OrderShippedEvent event) {
        orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
            order.setOrderShipped();
            emitUpdate(order);
            return order;
        });
    }

まとめ

本記事では、Axon Frameworkを用いてCQRSパターンを実装してみました。
実際に利用して感じた所感ですが

  • 「開発工数や認知コストの削減」について、
    コマンドやイベントの管理、Aggregateの復元など、複雑な処理をフレームワークが自動化してくれるため、CQRS/イベントソーシングの知見や資産を持たない開発者が同等の機能を実装する場合と比較して、開発工数や認知コストは抑えられそうと感じました。
    (プロダクトへ適用する際は、フレームワーク自体の理解や、フレームワークの内部処理を理解するコストは当然かかってくると思いますが)
  • 今回のケースではAxonServerを利用しましたが、実際のプロダクトで運用する場合はまだまだ調査することが多そう(クラスター管理やバックアップ手法など)と思いました。
    この辺りは有償プランの内容を確認したり、MySQL+Kafkaなどの知見のあるサービスをイベントストア、イベントバスとして利用するオプションを検討したいと思います。
    (次回はそのあたりを記事にしたい)

最後に、今回はあくまで入門編ということで基本的な部分のみにフォーカスしていますが、
次回はより実践的にプロダクトへの適用を見据えた検証を行えればと考えています。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?