はじめに
Mediatorは商用利用が有料になる可能性があるため、今回はMediator.SourceGeneratorを使用する。主な使用方法は変わらないがSourceGeneratorのため、あらかじめ各クラスを実装する必要がある。*本記事ではMediatorとして記載する。
経緯
-
事象
一括処理のリクエストにおいて、複数リクエストデータを実装するのではなく、単一リクエストデータに分解して単一のリクエストデータに対する処理を実装することで、開発効率を上げる。 -
課題
Mediatorには、単一リクエストデータに対するICommandはあるが、それを一括処理する機能がない。 -
対処
複数リクエストデータに対するコマンド・コマンドハンドラを追加することで課題を解決する。
対処内容
一括処理用の抽象クラス「コマンドBulkCommandBase.cs」「コマンドハンドラBulkCommandHandlerBase.cs」を実装する。BulkCommandBaseは、ICommandを配列として保持しており、BulkCommandHandlerBaseは、保持しているICommandを一括実行するロジックを実装する。
// 一括処理コマンド抽象クラス
public abstract class BulkCommandBase<TResponse> : ICommand<BulkResponse<TResponse>>
{
// コンストラクタ
public BulkCommandBase(IEnumerable<ICommand<TResponse>> commands)
{
this.Commands = commands;
}
// コマンドリスト
public virtual IEnumerable<ICommand<TResponse>> Commands { get; } = new List<ICommand<TResponse>>();
}
// 一括処理コマンドハンドラクラス
public abstract class BulkCommandHandlerBase<TBulkCommand, TResponse> : ICommandHandler<TBulkCommand, BulkResponse<TResponse>>
where TBulkCommand : BulkCommandBase<TResponse>
{
private readonly IMediator _mediator;
// コンストラクタ
public BulkCommandHandlerBase(
ICommandHandlerContext cmdContext,
IMediator mediator)
{
this._mediator = mediator ?? throw new ArgumentNullException(nameof(mediator));
}
// ハンドラ
public async ValueTask<BulkResponse<TResponse>> Handle(TBulkCommand bulkCommand, CancellationToken cancellationToken)
{
var tasks = bulkCommand.Commands.Select(
this._mediator.Send(command, cancellationToken).AsTask());
await Task.WhenAll(tasks);
}
}
データアクセスについて
-
事象
EntityFrameworkのDbContextを使用して、データベースにアクセスしている。 -
課題
DbContextはスレッドセーフである必要があるので、一括コマンドハンドラから各コマンドハンドラを並列実行する際にエラーが発生する。 -
対処
各コマンドハンドラを直列実行にすると解消されるが、N+1問題が発生する。そこでデータローダを採用することで課題を解決する。※N+1問題とデータローダについてはN+1問題の回避を参照のこと。
対処内容
データローダは、HotChocolateのGreenDonutを採用する。データローダの抽象クラス「読込用AppAllDataLoaderBase.cs」「書込用AppWriteDataLoader.cs」の2パターン実装し、書込用はCommandHandlerContext.csからSemaphoreSlimを使用してスレッドセーフに実行する。バッチ制御はアプリ専用にAppBatchScheduler.csを実装する。
/// 主キー以外からエンティティを取得するデータローダの抽象クラス
public abstract class AppAllDataLoaderBase<TKey, TValue> : AppDataLoaderBase<TKey, TValue>
where TKey : notnull
{
private readonly IAppDbContextFactory _dbContextFactory;
// コンストラクタ
public AppAllDataLoaderBase(
IAppDbContextFactory dbContextFactory,
IAppBatchScheduler batchScheduler,
IDataLoaderOptions option)
: base(batchScheduler, option)
{
this._dbContextFactory = dbContextFactory ?? throw new ArgumentNullException(nameof(dbContextFactory));
}
// バッチ処理
protected abstract async Task<IReadOnlyDictionary<TKey, TValue>> LoadBatchAsync(
IReadOnlyList<TKey> keys,
CancellationToken cancellationToken)
{
... // keysからdbContextFactoryを使用してデータ取得
}
}
/// 主キー以外からエンティティを更新のために取得するデータローダの抽象クラス
public abstract class AppWriteDataLoaderBase<TKey, TEntity> : AppDataLoaderBase<TKey, AppPermissionResult<TEntity>>
where TKey : IEquatable<TKey>
where TEntity : EntityBase, IHasPK<TEntity, TKey>
{
private readonly ICommandHandlerContext _cmdContext;
// コンストラクタ
public AppWriteDataLoaderBase(
ICommandHandlerContext cmdContext,
IAppBatchScheduler batchScheduler,
AppDataLoaderOptions option)
: base(batchScheduler, option)
{
this._cmdContext = cmdContext ?? throw new ArgumentNullException(nameof(cmdContext));
}
// バッチ処理
protected sealed override async Task<IReadOnlyDictionary<TKey, AppPermissionResult<TEntity>>> LoadBatchAsync(
IReadOnlyList<TKey> keys,
CancellationToken cancellationToken)
{
// SemaphoreSlimを使用してスレッドセーフにデータアクセスする。
var datas = await this._cmdContext.DbContextOperationAsync(async (dbContext) =>
{
... // keysからdbContextを使用してデータ取得
}, cancellationToken);
... // データを辞書化して返却
}
}
// コマンドハンドラ統合ラッパークラス
public class CommandHandlerContext : ICommandHandlerContext
{
private readonly SemaphoreSlim _semaphore = new(1, 1);
// スレッドセーフ制御
public async Task DbContextOperationAsync(
Action<IAppDbContext> func,
CancellationToken cancellationToken)
{
await this._semaphore.WaitAsync(cancellationToken);
try { func(this._dbContext); }
finally { this._semaphore.Release(); }
}
// アプリ用バッチスケジューラー
public class AppBatchScheduler : IAppBatchScheduler
{
// 発火処理
void IBatchScheduler.Schedule(Func<ValueTask> dispatch)
=> _ = ScheduleAsync(dispatch);
private async ValueTask ScheduleAsync(Func<ValueTask> dispatch)
{
// WhenAll内の他のタスクがDataLoaderにキーを登録する時間を設定する。
await Task.Delay(5);
await dispatch();
}
}