はじめに
近年、マイクロサービスアーキテクチャやイベントドリブンなシステム設計が注目を集める中、CQRS(Command Query Responsibility Segregation)パターンが注目されています。
システムをCQRSパターンで構築する場合、自前で実装する方法とフレームワークを利用する方法がありますが、
フレームワークを利用する利点は、認知コストや開発工数を削減できるという点だと思います。
本ページではJava向けのAxon Frameworkを使用して、CQRSパターンでのシステム構築に入門してみようと思います。
CQRSの基本概念
CQRSの基本概念については、解説記事が数多くあるのでそちらを参照してください。
ここではMicrosoft社の記事を紹介しておきます。
またシステムへの適用事例としてZOZOTOWNで実際にCQRSを適用した事例もご紹介します。
私も実業務としてCQRSパターンを使用したシステム開発・運用に携わっていますが、システムの要件にハマりさえすれば強力な利点があると実感しています。
利点
コマンドとクエリ、それぞれに特化した設計や実装を行うことができる
- 設計において、ビジネスロジックが詰まったコマンドのモデルと読み取り操作のみのモデルを分離することで、シンプルなモデル設計になる
- クエリ側とコマンド側のDBをそれぞれの要件に適した選択が可能になる
(コマンド側は同時書き込みにつよい分散データベース、クエリ側は複雑な検索やデータ整合性が求められるためRDBを使用するなど)
もちろんトレードオフとして以下のような課題も存在します
課題
- アーキテクチャが複雑になり実装における開発工数/認知コストが増加
- 読み取りの結果整合性を許容することになる
など
が、開発工数/認知コストが増加については、冒頭に述べた通りフレームワークを使用することで緩和することができるのではないでしょうか。
Axon Frameworkの概要
Axon FrameworkはCQRS/EventSouring/DDDという3つのコアコンセプトを中心としたフレームワークです。
AxonIQによってホストされており、Apache License v2.0の下オープンソースで提供されています。
(イベントストア/イベントバスとして利用できるAxon Serverについては、AxonIQ特有のライセンスで提供されており、クラスタ構成で利用する場合は商用ライセンスでの提供となりますが、MySQLやKafkaなどを代用することも可能です)
Axon Frameworkの主要コンポーネント
Axon Frameworkは、CQRSと同時にイベントソーシングをコンセプトにしているため、システムの状態変更は全てイベントを通じて行われます。
下図はAxon Frameworkにおけるシステムの状態変更の概略図です。
イベントを中核とし三つのメッセージオブジェクトを通じて通信が行われていることがわかります。
参考:Axon Framework Reference / Messaging Concepts
- Command: アプリケーションの状態変更を意図したメッセージ
- Event: アプリケーション内で発生した出来事を記録するメッセージ
- Query: 情報や状態を要求するメッセージ
実際に実装してみる
Java関連の有益な記事を多く提供している海外サイトBaeldungのサンプルリポジトリを用いて実装してみます。
リポジトリクローン
上記リポジトリでは他チュートリアルリソースも含めてモノレポで提供しているので、
AxonFrameworkのチュートリアルだけgitのSparse Checkoutを使用してクローンします。
git clone --filter=blob:none --no-checkout https://github.com/eugenp/tutorials.git
cd tutorials
git sparse-checkout init --cone
git sparse-checkout set patterns-modules/axon
git sparse-checkout add parent-boot-3
git checkout master
cd patterns-modules/axon
動作確認
起動処理
そのまま起動して動作確認を行ってみます。
サンプルではイベントストアとしてAxon ServerをDockerで起動して使用しています。
まずはAxonServerの起動スクリプトを実行し、WEBGUIから有効化を行います。
chmod +x ./start_axon_server.sh & ./start_axon_server.sh
起動後、http://localhost:8024/ にアクセスしてCompleteボタンを押して有効化
その後にSpringBoot Applicationを起動します
mvn clean install
mvn spring-boot:run
Orderの作成
APIを通じてOrderの取得をしてみます。クエリ側のall-orders
のエンドポイントを実行してみます。
curl -X 'GET' \
'http://localhost:8080/all-orders' \
-H 'accept: application/json; charset=utf-8'
// レスポンスは空
[]%
Orderが登録されていないため、空のレスポンスが帰ってきます。
では次に、コマンド側のエンドポイントorder
を実行してOrderを登録します。
curl -X 'POST' \
'http://localhost:8080/order' \
-H 'accept: application/json; charset=utf-8'
a6b9a6fe-939b-4823-ae12-d28adb52b2df%
その後、先ほどと同じall-orders
のエンドポイントを使用すると、Orderが登録されていることがわかります。
curl -X 'GET' \
'http://localhost:8080/all-orders' \
-H 'accept: application/json; charset=utf-8' | jq
// Orderが登録されている
[
{
"orderId": "a6b9a6fe-939b-4823-ae12-d28adb52b2df",
"products": {},
"orderStatus": "CREATED"
}
]
コマンド側のエンドポイント/order
を実行されると図中の青枠部分までが同期的に実行されます。
- コントローラがコマンド型のメッセージ(CreateOrderCommand)を発火
- それを受け取ったCommandHandlerはOrderの作成を意味するイベント型のメッセージ(OrderCreatedEvent)を作成し、イベントストア(今回の例ではAxonServer)へ保存と同時にイベントバス(今回の例ではAxonServer)へ公開する
これはつまり、コマンド側のエンドポイントの責務はイベントをイベントストアとイベントバスへ公開する部分までになるということです。
イベントを元にクエリ側のリードDBを更新する部分は非同期的に行われます。
そのためクエリ側のエンドポイントの実行タイミングによっては、リードDBのアップデートが行われていないため、結果整合性の読み取りを許容する必要が出てきます。
実際にAxonServerのWEBGUIからイベントストアを確認すると、
OrderCreatedEventが保存されていることがわかります。
Orderの状態を更新する
では次にコマンド側のエンドポイントを利用して作成したOrderの状態を変更してみます。
// 対象のOrderをconfirm状態にする
curl -X 'POST' \
'http://localhost:8080/order/a6b9a6fe-939b-4823-ae12-d28adb52b2df/confirm' \
-H 'accept: application/json; charset=utf-8'
// 対象のOrderをshipped状態にする
curl -X 'POST' \
'http://localhost:8080/order/a6b9a6fe-939b-4823-ae12-d28adb52b2df/ship' \
-H 'accept: application/json; charset=utf-8'
クエリ側のエンドポイントを利用して状態をクエリすると対象のOrderのステータスが更新されていることが確認できます。
curl -X 'GET' \
'http://localhost:8080/all-orders' \
-H 'accept: application/json; charset=utf-8' | jq
// Orderのステータスが更新されている
{
"orderId": "a6b9a6fe-939b-4823-ae12-d28adb52b2df",
"products": {},
"orderStatus": "SHIPPED"
}
先程と同様に見えますが、異なる箇所があります。
Order作成のおいては、新規にイベントを発行するだけでしたが、今回はすでに作成されたOrderの状態を変更します。
そのため、まずはイベントストアに保存された対象IDのOrderイベントを全て取得し、最新の状態に復元する必要があります。
イベントソーシングの基本となるこの動作ですが、この辺りの処理は全てAxonFrameworkが内部的に行ってくれます。
コードを確認する
上記の動作確認で行った処理について実際のコードを確認してみます。
まず前提として、Axon FrameworkではAggregateという単位でビジネスロジックを管理・処理します。
(AggregateはDDDの概念で、関連するエンティティや値オブジェクトを一つにまとめたもの)
今回の例においては、OrderはOrderAggregateクラスにて作成や更新処理が一元的に管理されています。
Orderの作成
コマンド側のエンドポイント/order
を実行すると以下のようにCreateOrderCommandが作成され、commandGateway.sendメソッドを通じて発火されています。
commandGatewayはAxonFrameworkのコンポーネントでコマンド型のメッセージを受けとり、適切なOrderAggregateクラスのインスタンスにルーティングしてくれます。
今回は作成コマンドなので、コンストラクタを呼び出してOrderAggregateのインスタンス作成が行われます。
@PostMapping("/order")
public CompletableFuture<String> createOrder() {
return createOrder(UUID.randomUUID()
.toString());
}
@PostMapping("/order/{order-id}")
public CompletableFuture<String> createOrder(@PathVariable("order-id") String orderId) {
return commandGateway.send(new CreateOrderCommand(orderId));
}
CreateOrderCommandを元にAxonFrameworkが自動的に以下のコンストラクタを実行します。
今度はコンストラクタの処理としてapplyメソッドが実行されています。
このapplyメソッドによって、イベントストアへOrderCreatedEventが保存され同時にイベントバスへの公開が行われます。
(イベントストアとイベントバスへの書き込みの一貫性の担保をどのように実現しているかは読み取れませんでした。
おそらくどちらかが失敗すればロールバックされるものと思いますが。)
@Aggregate(snapshotTriggerDefinition = "orderAggregateSnapshotTriggerDefinition")
public class OrderAggregate {
// 関連部分のみ記載
@AggregateIdentifier
private String orderId;
private boolean orderConfirmed;
@AggregateMember
private Map<String, OrderLine> orderLines;
@CommandHandler
public OrderAggregate(CreateOrderCommand command) {
apply(new OrderCreatedEvent(command.getOrderId()));
}
}
ではイベントバスに公開された、OrderCreatedEventはどのようにリードDBに反映されるのでしょうか。
この部分はAxon Frameworkの概略図でいうEvent Handling Component
が担当します。
具体的には、@EventHandlerアノテーションを付与したメソッドがEvent Handling Component
として自動的に登録され、指定されたイベントが公開されるとそのメソッドが自動的に実行されます。
OrderCreatedEventが発火すると、onメソッドが呼び出され、イベントから取得したorderIdを使用して新しいOrderオブジェクトを作成し、リードDB(この例ではインメモリ)に追加します。
@Service
@ProcessingGroup("orders")
@Profile("!mongo")
public class InMemoryOrdersEventHandler implements OrdersEventHandler {
private final Map<String, Order> orders = new HashMap<>();
private final QueryUpdateEmitter emitter;
public InMemoryOrdersEventHandler(QueryUpdateEmitter emitter) {
this.emitter = emitter;
}
@EventHandler
public void on(OrderCreatedEvent event) {
String orderId = event.getOrderId();
orders.put(orderId, new Order(orderId));
}
Orderの更新
次に、既存のOrderの状態を変更する部分を確認します。
Orderをconfirm状態およびshipped状態に更新する処理を見ていきます。
ここは新規作成と変わりありません。それぞれConfirmOrderCommand
とShipOrderCommand
をcommandGateway.sendメソッドを通じて発火しています。
@PostMapping("/order/{order-id}/confirm")
public CompletableFuture<Void> confirmOrder(@PathVariable("order-id") String orderId) {
return commandGateway.send(new ConfirmOrderCommand(orderId));
}
@PostMapping("/order/{order-id}/ship")
public CompletableFuture<Void> shipOrder(@PathVariable("order-id") String orderId) {
return commandGateway.send(new ShipOrderCommand(orderId));
}
上記のコマンドがcommandGateway.sendメソッドを通じてAggregateに送られると、
Axon Frameworkはまず該当するAggregateのインスタンスを復元する必要があります。
このプロセスは以下のステップで行われます。
- イベントストアからのイベントの読み込み:
AxonFrameworkは、対象のAggregateに関連するすべての過去のイベント(例えば、OrderCreatedEvent、OrderConfirmedEvent、OrderShippedEventなど)をイベントストアから読み込む。 - イベントの適用:
読み込んだイベントを順次Aggregateに適用し自身の状態を最新の状態に更新します。
イベント適用処理は@EventSourcingHandlerアノテーションを付与してメソッドで記載する。 - コマンドの処理:
現在の状態が復元されたAggregateに対して、発行されたコマンドが処理され、
新たなイベントが生成され、イベントストアに保存されると同時に、イベントバスを通じて公開されます。
@Aggregate(snapshotTriggerDefinition = "orderAggregateSnapshotTriggerDefinition")
public class OrderAggregate {
// 関連部分のみ記載
@AggregateIdentifier
private String orderId;
private boolean orderConfirmed;
// ③ 現在の状態が復元されたAggregateに対して、発行されたコマンドが処理される
@CommandHandler
public void handle(ConfirmOrderCommand command) {
if (orderConfirmed) {
return;
}
apply(new OrderConfirmedEvent(orderId));
}
// ③ 現在の状態が復元されたAggregateに対して、発行されたコマンドが処理される
@CommandHandler
public void handle(ShipOrderCommand command) {
if (!orderConfirmed) {
throw new UnconfirmedOrderException();
}
apply(new OrderShippedEvent(orderId));
}
// ② 読み込んだイベントを順次適用して最新の状態に更新
@EventSourcingHandler
public void on(OrderCreatedEvent event) {
this.orderId = event.getOrderId();
this.orderConfirmed = false;
this.orderLines = new HashMap<>();
}
// ② 読み込んだイベントを順次適用して最新の状態に更新
@EventSourcingHandler
public void on(OrderConfirmedEvent event) {
this.orderConfirmed = true;
}
}
イベントバスに公開された、OrderConfirmedEvent
やOrderShippedEvent
はOrderCreatedEvent
と同様にEvent Handling Component
によってキャッチされリードDBへのupdateが行われます。
@EventHandler
public void on(OrderConfirmedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.setOrderConfirmed();
emitUpdate(order);
return order;
});
}
@EventHandler
public void on(OrderShippedEvent event) {
orders.computeIfPresent(event.getOrderId(), (orderId, order) -> {
order.setOrderShipped();
emitUpdate(order);
return order;
});
}
まとめ
本記事では、Axon Frameworkを用いてCQRSパターンを実装してみました。
実際に利用して感じた所感ですが
- 「開発工数や認知コストの削減」について、
コマンドやイベントの管理、Aggregateの復元など、複雑な処理をフレームワークが自動化してくれるため、CQRS/イベントソーシングの知見や資産を持たない開発者が同等の機能を実装する場合と比較して、開発工数や認知コストは抑えられそうと感じました。
(プロダクトへ適用する際は、フレームワーク自体の理解や、フレームワークの内部処理を理解するコストは当然かかってくると思いますが) - 今回のケースではAxonServerを利用しましたが、実際のプロダクトで運用する場合はまだまだ調査することが多そう(クラスター管理やバックアップ手法など)と思いました。
この辺りは有償プランの内容を確認したり、MySQL+Kafkaなどの知見のあるサービスをイベントストア、イベントバスとして利用するオプションを検討したいと思います。
(次回はそのあたりを記事にしたい)
最後に、今回はあくまで入門編ということで基本的な部分のみにフォーカスしていますが、
次回はより実践的にプロダクトへの適用を見据えた検証を行えればと考えています。