はじめに
JavaのCQRS+ESフレームワークであるAxon Frameworkについて、
集約のスナップショット作成方式がAPIのレイテンシーに影響を及ぼしていたため設定変更を行いました。
設定変更を行う中で調査して内容を共有します。
Axon Frameworkとは
こちらに簡単な紹介とハンズオンを記載しておりますのでご覧ください。
スナップショットとは
Event Sourcingでは、Aggregate(集約)の状態はイベントを最初から順番に再生することで復元されます。しかし、イベント数が増加するとこの再生処理に時間がかかり、パフォーマンスが低下します。
そこで、ある時点のAggregateの状態をスナップショットとして保存しておきます。
スナップショットが存在する場合、そのスナップショット以降のイベントのみをリプレイすればよいため、Aggregateのロード時間を大幅に短縮できます。
【スナップショットなしの場合】
Event1 → Event2 → Event3 → ... → Event1000 → 現在の状態
(1000件すべてをリプレイ)
【スナップショットありの場合】
Snapshot(Event500時点の状態) → Event501 → ... → Event1000 → 現在の状態
(500件のみリプレイ)
Axon Frameworkのスナップショット作成の仕組み
アーキテクチャ概要
Axon Frameworkのスナップショット作成は、以下のコンポーネントで構成されています。
| コンポーネント | 役割 |
|---|---|
| Snapshotter | スナップショット作成をスケジュールするインターフェース |
| AggregateSnapshotter | 実際にスナップショットを作成・保存する実装クラス |
| SnapshotTriggerDefinition | スナップショット作成のトリガー条件を定義 |
| Executor | スナップショット作成タスクの実行方式を決定 |
スナップショットのトリガー
スナップショット作成のトリガーには主に2種類あります。
1.EventCountSnapshotTriggerDefinition
イベント数が閾値に達したときにスナップショットを作成します。
@Bean
public SnapshotTriggerDefinition snapshotTriggerDefinition(Snapshotter snapshotter) {
// 100イベントごとにスナップショットを作成
return new EventCountSnapshotTriggerDefinition(snapshotter, 100);
}
2.AggregateLoadTimeSnapshotTriggerDefinition
Aggregateのロード時間が閾値を超えたときにスナップショットを作成します。
@Bean
public SnapshotTriggerDefinition snapshotTriggerDefinition(Snapshotter snapshotter) {
// ロード時間が100msを超えたらスナップショットを作成
return new AggregateLoadTimeSnapshotTriggerDefinition(snapshotter, 100);
}
スナップショット作成処理の実行方式
AbstractSnapshotterは内部でExecutorを使用してスナップショット作成タスクを実行します。デフォルトではDirectExecutorが使用されます。
// AbstractSnapshotter.java
public abstract class AbstractSnapshotter implements Snapshotter {
public abstract static class Builder {
// デフォルトは DirectExecutor
private Executor executor = DirectExecutor.INSTANCE;
...
protected void doScheduleSnapshot(Class<?> aggregateType, String aggregateIdentifier) {
// executor.execute() でスナップショット作成タスクを実行
executor.execute(silently(
() -> transactionManager.executeInTransaction(
createSnapshotterTask(aggregateType, aggregateIdentifier)
)
));
}
}
DirectExecutorは呼び出し元のスレッドでそのまま実行するため、同期的にスナップショットが作成されます。
// DirectExecutor.java
public final class DirectExecutor implements Executor {
@Override
public void execute(Runnable command) {
command.run(); // 呼び出し元スレッドで即座に実行
}
}
この場合、コマンド処理の流れは以下のようになり、スナップショット作成処理がレイテンシーに影響を与えます。
スナップショット作成処理の実行方式を非同期にする
上記のデフォルトのスナップショット作成方式の場合、スナップショット作成処理でスレッドがブロックされるため、APIのレイテンシーに直接影響します。
そこで実行方式を非同期化します
設定方法
SnapshotterのBuilderでExecutorを差し替えることで、非同期化できます。
Spring Boot Auto Configuration を使用する場合
Spring Bootの自動設定を利用している場合は、SpringAggregateSnapshotterFactoryBeanをカスタマイズします。
@Configuration
public class AxonSnapshotConfig {
@Bean
public Executor snapshotExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(4);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("snapshot-");
executor.initialize();
return executor;
}
@Bean
@Primary
public Snapshotter snapshotter(EventStore eventStore,
TransactionManager transactionManager,
ParameterResolverFactory parameterResolverFactory,
HandlerDefinition handlerDefinition,
Executor snapshotExecutor) {
return SpringAggregateSnapshotter.builder()
.eventStore(eventStore)
.transactionManager(transactionManager)
.parameterResolverFactory(parameterResolverFactory)
.handlerDefinition(handlerDefinition)
.executor(snapshotExecutor)
.build();
}
}
非同期化後の処理フロー
注意点
スレッドプールのサイズ
スナップショット作成はI/O処理を含むため、CPUコア数より多めのスレッドを確保することを推奨します。ただし、データベースへの負荷も考慮してください。
エラーハンドリング
Axon FrameworkはSilentTaskでエラーをラップしており、ConcurrencyException(既にスナップショットが存在する場合)は自動的に無視されます。その他のエラーはログに出力されますが、処理は継続します。
重複防止機構
同一Aggregateに対する複数のスナップショット作成リクエストは、ConcurrentHashMapで管理され、重複実行が防止されます。
// AbstractSnapshotter.java
private final Set<AggregateTypeId> snapshotsInProgress = ConcurrentHashMap.newKeySet();
protected void doScheduleSnapshot(...) {
if (snapshotsInProgress.add(typeAndId)) {
// 新規の場合のみ実行
executor.execute(...);
}
// 既に進行中の場合はスキップ
}
まとめ
Axon Frameworkのスナップショット作成は、デフォルトではDirectExecutorによる同期実行となっています。
これをThreadPoolなどの非同期Executorに差し替えることで、APIのレイテンシーへの影響を排除できます。
| 項目 | 同期(デフォルト) | 非同期 |
|---|---|---|
| Executor | DirectExecutor | ThreadPoolExecutor等 |
| レイテンシー影響 | あり | なし |
| 設定の手間 | 不要 | Executor設定が必要 |
| リソース消費 | 少ない | スレッドプール分の追加リソース |
スナップショット作成頻度が高い、またはスナップショット作成に時間がかかる環境では、非同期化によるパフォーマンス改善効果もあると思いますので検討してみてもよいのはないでしょうか。