2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Axon Frameworkのスナップショット作成を非同期化する

Last updated at Posted at 2025-12-11

はじめに

JavaのCQRS+ESフレームワークであるAxon Frameworkについて、
集約のスナップショット作成方式がAPIのレイテンシーに影響を及ぼしていたため設定変更を行いました。
設定変更を行う中で調査して内容を共有します。

Axon Frameworkとは

こちらに簡単な紹介とハンズオンを記載しておりますのでご覧ください。

スナップショットとは

Event Sourcingでは、Aggregate(集約)の状態はイベントを最初から順番に再生することで復元されます。しかし、イベント数が増加するとこの再生処理に時間がかかり、パフォーマンスが低下します。
そこで、ある時点のAggregateの状態をスナップショットとして保存しておきます。
スナップショットが存在する場合、そのスナップショット以降のイベントのみをリプレイすればよいため、Aggregateのロード時間を大幅に短縮できます。

  【スナップショットなしの場合】
  Event1 → Event2 → Event3 → ... → Event1000 → 現在の状態
  (1000件すべてをリプレイ)

  【スナップショットありの場合】
  Snapshot(Event500時点の状態) → Event501 → ... → Event1000 → 現在の状態
  (500件のみリプレイ)

Axon Frameworkのスナップショット作成の仕組み

アーキテクチャ概要

Axon Frameworkのスナップショット作成は、以下のコンポーネントで構成されています。

コンポーネント 役割
Snapshotter スナップショット作成をスケジュールするインターフェース
AggregateSnapshotter 実際にスナップショットを作成・保存する実装クラス
SnapshotTriggerDefinition スナップショット作成のトリガー条件を定義
Executor スナップショット作成タスクの実行方式を決定

スナップショットのトリガー

スナップショット作成のトリガーには主に2種類あります。

1.EventCountSnapshotTriggerDefinition

イベント数が閾値に達したときにスナップショットを作成します。

  @Bean
  public SnapshotTriggerDefinition snapshotTriggerDefinition(Snapshotter snapshotter) {
      // 100イベントごとにスナップショットを作成
      return new EventCountSnapshotTriggerDefinition(snapshotter, 100);
  }

2.AggregateLoadTimeSnapshotTriggerDefinition

Aggregateのロード時間が閾値を超えたときにスナップショットを作成します。

  @Bean
  public SnapshotTriggerDefinition snapshotTriggerDefinition(Snapshotter snapshotter) {
      // ロード時間が100msを超えたらスナップショットを作成
      return new AggregateLoadTimeSnapshotTriggerDefinition(snapshotter, 100);
  }

スナップショット作成処理の実行方式

AbstractSnapshotterは内部でExecutorを使用してスナップショット作成タスクを実行します。デフォルトではDirectExecutorが使用されます。

  // AbstractSnapshotter.java
  public abstract class AbstractSnapshotter implements Snapshotter {

      public abstract static class Builder {
          // デフォルトは DirectExecutor
          private Executor executor = DirectExecutor.INSTANCE;
          ...

      protected void doScheduleSnapshot(Class<?> aggregateType, String aggregateIdentifier) {
          // executor.execute() でスナップショット作成タスクを実行
          executor.execute(silently(
              () -> transactionManager.executeInTransaction(
                  createSnapshotterTask(aggregateType, aggregateIdentifier)
              )
          ));
      }
  }

DirectExecutorは呼び出し元のスレッドでそのまま実行するため、同期的にスナップショットが作成されます。

  // DirectExecutor.java
  public final class DirectExecutor implements Executor {
      @Override
      public void execute(Runnable command) {
          command.run(); // 呼び出し元スレッドで即座に実行
      }
  }

この場合、コマンド処理の流れは以下のようになり、スナップショット作成処理がレイテンシーに影響を与えます。

スナップショット作成処理の実行方式を非同期にする

上記のデフォルトのスナップショット作成方式の場合、スナップショット作成処理でスレッドがブロックされるため、APIのレイテンシーに直接影響します。
そこで実行方式を非同期化します

設定方法

SnapshotterのBuilderでExecutorを差し替えることで、非同期化できます。

Spring Boot Auto Configuration を使用する場合

Spring Bootの自動設定を利用している場合は、SpringAggregateSnapshotterFactoryBeanをカスタマイズします。

  @Configuration
  public class AxonSnapshotConfig {

      @Bean
      public Executor snapshotExecutor() {
          ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
          executor.setCorePoolSize(2);
          executor.setMaxPoolSize(4);
          executor.setQueueCapacity(100);
          executor.setThreadNamePrefix("snapshot-");
          executor.initialize();
          return executor;
      }

      @Bean
      @Primary
      public Snapshotter snapshotter(EventStore eventStore,
                                     TransactionManager transactionManager,
                                     ParameterResolverFactory parameterResolverFactory,
                                     HandlerDefinition handlerDefinition,
                                     Executor snapshotExecutor) {
          return SpringAggregateSnapshotter.builder()
                  .eventStore(eventStore)
                  .transactionManager(transactionManager)
                  .parameterResolverFactory(parameterResolverFactory)
                  .handlerDefinition(handlerDefinition)
                  .executor(snapshotExecutor)
                  .build();
      }
  }

非同期化後の処理フロー

注意点

スレッドプールのサイズ

スナップショット作成はI/O処理を含むため、CPUコア数より多めのスレッドを確保することを推奨します。ただし、データベースへの負荷も考慮してください。

エラーハンドリング

Axon FrameworkはSilentTaskでエラーをラップしており、ConcurrencyException(既にスナップショットが存在する場合)は自動的に無視されます。その他のエラーはログに出力されますが、処理は継続します。

重複防止機構

同一Aggregateに対する複数のスナップショット作成リクエストは、ConcurrentHashMapで管理され、重複実行が防止されます。

  // AbstractSnapshotter.java
  private final Set<AggregateTypeId> snapshotsInProgress = ConcurrentHashMap.newKeySet();

  protected void doScheduleSnapshot(...) {
      if (snapshotsInProgress.add(typeAndId)) {
          // 新規の場合のみ実行
          executor.execute(...);
      }
      // 既に進行中の場合はスキップ
  }

まとめ

Axon Frameworkのスナップショット作成は、デフォルトではDirectExecutorによる同期実行となっています。
これをThreadPoolなどの非同期Executorに差し替えることで、APIのレイテンシーへの影響を排除できます。

項目 同期(デフォルト) 非同期
Executor DirectExecutor ThreadPoolExecutor等
レイテンシー影響 あり なし
設定の手間 不要 Executor設定が必要
リソース消費 少ない スレッドプール分の追加リソース

スナップショット作成頻度が高い、またはスナップショット作成に時間がかかる環境では、非同期化によるパフォーマンス改善効果もあると思いますので検討してみてもよいのはないでしょうか。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?