はじめに
OSSでCQRS+ESのframeworkがあったので習熟を兼ねて紹介してみます。
EventFlowとは
EventFlow is a basic CQRS+ES framework designed to be easy to use.
詳細はドキュメントを参照してほしいのですが
- DDD周りの機能(Identity, ValueObject, Entity, Aggregate, Event)
- CQRSのCommand + ES機能(Command, Command Handler, Command Bus, Subscriber, Event Store, Saga, Job, Metadata, Snapshots)
- CQRSのQuery機能(Query, Query Handler, Read model Store)
等々欲しい機能は一通り揃っている印象です。
業務内容定義
写経しても面白くないので以下の業務内容を定義して実装してみることにしました。
- 図書館みたいなシステム
- 集約は利用者と本
- 利用者
- 利用者のID: guid
- 氏名: string
- 本
- 本のID: guid
- 本のタイトル: stirng
- 利用者のID: guid
- 貸出期間: Datetime(fromとto)
- 利用者
- ドメインイベント
- 利用者を登録した
- 本を登録した
- 本を借りた
- 本を返した
- 本を破棄した
環境
- ASP.NET Core 2.2
- EventFlow 0.74.3948
- EventFlow.AspNetCore 0.74.3948
- EventFlow.Autofac 0.74.3948
- EventFlow.DependencyInjection 0.74.3948
- Autofac.Extensions.DependencyInjection 4.4.0
機能紹介
Identity(ドキュメント)
集約やエンティティを一意に決めるためのIDになります。様々な場所で使われます。Guidをラップしている感じです。
[JsonConverter(typeof(SingleValueObjectConverter))]
public class 利用者のID : Identity<利用者のID>
{
public 利用者のID(string value) : base(value) { }
}
Identityを継承します。
public abstract class Identity<T> : SingleValueObject<string>, IIdentity where T : Identity<T>
後述しますがSingleValueObjectを継承しているのでSingleValueObjectConverterで変換可能です。
ValueObject(ドキュメント)
[JsonConverter(typeof(SingleValueObjectConverter))]
public class 本のタイトル : SingleValueObject<string>
{
public 本のタイトル(string value) : base(value) { }
}
public class 氏名 : ValueObject
{
public string Value { get; }
public 氏名(string value) => Value = value;
public static 氏名 Create(string value) => new 氏名(value);
}
親クラスとしてValueObjectが存在し、それを継承してSingleValueObjectが存在します。
プリミティブな型を一つだけしか持たない値オブジェクトならばSingleValueObjectで十分です。
SingleValueObjectConverterはSingleValueObjectに特化したJsonコンバータで、プロパティがGetOnlyの場合でも値を詰めてくれます。
ValueObjectを含んだValueObject
[JsonConverter(typeof(SingleValueObjectConverter))]
public class 貸出期間自 : SingleValueObject<DateTime>
{
public 貸出期間自(DateTime value) : base(value) { }
}
[JsonConverter(typeof(SingleValueObjectConverter))]
public class 貸出期間至 : SingleValueObject<DateTime>
{
public 貸出期間至(DateTime value) : base(value) { }
}
public class 貸出期間 : ValueObject
{
public 貸出期間自 貸出期間自 { get; set; } // Converterを用意しないとset取れない(取ると値はいらない).
public 貸出期間至 貸出期間至 { get; set; }
public 貸出期間() : base() { }
public 貸出期間(貸出期間自 _貸出期間自, 貸出期間至 _貸出期間至) : this()
{
貸出期間自 = _貸出期間自 ?? throw new ArgumentNullException(nameof(_貸出期間自));
貸出期間至 = _貸出期間至 ?? throw new ArgumentNullException(nameof(_貸出期間至));
}
}
こんな感じでValueObjectを入れ子にしたいときがあるとお思います。
本当ならプロパティはGetOnlyにしたいんですけど、JsonConverterのデシリアライズ時に上手く値を詰めてくれず止む無くsetを付けています。
これの解決方法は調査中です。
Aggregates(ドキュメント)
このフレームワークではエンティティと集約を明確に分けています。
今回は集約の中にエンティティが入ることがなかったのでEntity型は使っていません。
ただ、
public abstract class Entity<TIdentity> : ValueObject, IEntity<TIdentity>
な感じなので一意な値を持った値オブジェクト程度の扱いなのかなと。
public class 利用者 : AggregateRoot<利用者, 利用者のID>, IEmit<利用者を登録した>
{
public 氏名 氏名 { get; set; }
public 利用者(利用者のID id) : base(id) { }
public void 利用者を登録する(氏名 _氏名) => Emit(new 利用者を登録した(_氏名));
public void Apply(利用者を登録した ev) => 氏名 = ev.氏名;
}
↑は集約の例です。AggregateRootを継承します。その際に自身と使用するIdentityを設定します。
IEmitについては後述。
Event(ドキュメント)
[EventVersion("利用者を登録した", 1)]
public class 利用者を登録した : AggregateEvent<利用者, 利用者のID>
{
public 氏名 氏名 { get; set; }
public 利用者を登録した() : base() { }
public 利用者を登録した(氏名 _氏名) : this() => 氏名 = _氏名;
}
ドメインイベントを定義します。AggregateEventを継承します。
イベントを実現するために必要な情報を詰め込みます。
このイベントもjsonのシリアライズ/デシリアライズがフレームワーク内部で発生するのですがプロパティがGetOnlyだと値が入ってくれません。
JsonConverterの問題だと思うのですが引数無しコンストラクタで生成してからsetしている感じです。
EventVersionAttributeはドメインイベントのバージョンを管理してくれます。
イベントソーシングの場合ドメインイベントの内容が変化したとしても過去のドメインイベントはそのまま残しておく必要があります(復元できなくなるため)。
この機能は試していないので詳細はEvent upgrade参照。
CommandとCommandHandler(ドキュメント)
コマンドとコマンドハンドラーを定義します。
コマンドとコマンドハンドラーは1:1の関係です。
ドメインイベントとコマンドは関連付いている必要はないです。
public class 利用者を登録するCommand : Command<利用者, 利用者のID, IExecutionResult>
{
public 氏名 氏名 { get; }
public 利用者を登録するCommand(利用者のID id, 氏名 _氏名) : base(id) => 氏名 = _氏名;
}
public class 利用者を登録するCommandHandler : CommandHandler<利用者, 利用者のID, IExecutionResult, 利用者を登録するCommand>
{
public 利用者を登録するCommandHandler() : base() { }
public override Task<IExecutionResult> ExecuteCommandAsync(利用者 aggregate, 利用者を登録するCommand command, CancellationToken cancellationToken)
{
aggregate.利用者を登録する(command.氏名);
return Task.FromResult(ExecutionResult.Success());
}
}
CommandとCommandHandlerをそれぞれ継承します。
Commandにはコマンドを実行するのに必要な情報を詰め込みます。
CommandHandlerはそのコマンドで実行したいことを記載します。
ただしドメインイベントを発行するためのメソッドEmitはAggregateRootのprotected virtualメソッドのため最後は集約のpublicメソッドを呼ぶ必要があります。
public void 利用者を登録する(氏名 _氏名) => Emit(new 利用者を登録した(_氏名));
Emitが呼ばれると
public interface IEmit<in TAggregateEvent> where TAggregateEvent : IAggregateEvent
を実装しているクラスのApplyメソッドが呼ばれます。今回で言うと利用者集約のApply。
このApplyについての実装方法はいくつかあるのですがまた今度(ドキュメント)。
コマンドの発行方法
container.Resolve<ICommandBus>()
.PublishAsync(
new 利用者を登録するCommand(利用者のID.New, 氏名.Create("田中太郎"))
, CancellationToken.None);
後述しますがどうにかICommandBusのインスタンスを取得してCommandを生成してPublishAsyncするだけです。
Read model(ドキュメント)
ドメインイベントをEmitするとそのイベントがイベントストアに収納されるのですが、その際に対応付けているRead modelが更新されます。
public class 利用者ReadModel : IReadModel, IAmReadModelFor<利用者, 利用者のID, 利用者を登録した>
{
public 利用者のID 利用者のID { get; private set; }
public 氏名 氏名 { get; private set; }
public void Apply(IReadModelContext context, IDomainEvent<利用者, 利用者のID, 利用者を登録した> domainEvent)
{
利用者のID = domainEvent.AggregateIdentity;
氏名 = domainEvent.AggregateEvent.氏名;
}
}
IAmReadModelForがポイントです。(IAmAsyncReadModelForもあります)
これを設定しておくと指定しているイベントが発行されたときに更新処理が走ります。このクラスはQueryの機能で使用します。
Queries(ドキュメント)
Read model Storeに格納されている情報を取得します。今回はRead modelがIn-memoryの場合の話です。詳細はここ参照。
await QueryProcessor.ProcessAsync(new InMemoryQuery<利用者ReadModel>(_ => true), CancellationToken.None);
取得したい場所でどうにかIQueryProcessorのインスタンスを取得しProcessAsyncをコールします。
↑で定義したIReadModelを単純に全部取り出したい場合は↑の感じで大丈夫です。(InMemoryQueryの引数はFunc<IReadModel, bool>なpredicate)
独自のDTOを取得したい場合はIQueryとIQueryHandlerを実装します。
public class 本DTO : IReadModel
{
public 本のID 本のID { get; set; }
public 本のタイトル 本のタイトル { get; set; }
public 貸出期間 貸出期間 { get; set; }
public 利用者のID 利用者のID { get; set; }
public 氏名 氏名 { get; set; }
}
public class 本DTOQuery : IQuery<IReadOnlyCollection<本DTO>>
{
}
public class 本DTOQueryHandler : IQueryHandler<本DTOQuery, IReadOnlyCollection<本DTO>>
{
private IInMemoryReadStore<本ReadModel> 本Store { get; }
private IInMemoryReadStore<利用者ReadModel> 利用者Store { get; }
public 本DTOQueryHandler(IInMemoryReadStore<本ReadModel> _本Store, IInMemoryReadStore<利用者ReadModel> _利用者Store)
{
本Store = _本Store;
利用者Store = _利用者Store;
}
public async Task<IReadOnlyCollection<本DTO>> ExecuteQueryAsync(本DTOQuery query, CancellationToken cancellationToken)
{
var 本Model = await 本Store.FindAsync(model => true, cancellationToken);
var 利用者Model = await 利用者Store.FindAsync(model => true, cancellationToken);
var list = from 本 in 本Model
join _利用者 in 利用者Model on 本.利用者のID equals _利用者.利用者のID into 利用者Join from 利用者 in 利用者Join.DefaultIfEmpty()
select new 本DTO { 本のID = 本.本のID, 本のタイトル = 本.本のタイトル, 利用者のID = 本?.利用者のID, 貸出期間 = 本?.貸出期間, 氏名 = 利用者?.氏名 };
return list.ToList().AsReadOnly();
}
}
初期設定
今まで作成したクラスをアプリケーションの最初でEventFlowに設定します。
public IServiceProvider ConfigureServices(IServiceCollection services)
{
~
~
var containerBuilder = new ContainerBuilder();
containerBuilder.Populate(services);
IContainer container = EventFlowOptions.New
.UseAutofacContainerBuilder(containerBuilder) // Must be the first line!
.AddAspNetCore((AspNetCoreEventFlowOptions op) =>
op.UseLogging()
.UseDefaults()
//.UseMvcJsonOptions()
//.AddDefaultMetadataProviders()
//.AddRequestHeadersMetadata()
//.AddUriMetadata()
//.AddUserHostAddressMetadata()
//.RunBootstrapperOnHostStartup()
)
.AddEvents(new[] {
typeof(利用者を登録した),
typeof(本を登録した),
typeof(本を借りた),
typeof(本を返した),
typeof(本を破棄した),
})
.AddCommands(new[] {
typeof(利用者を登録するCommand),
typeof(本を登録するCommand),
typeof(本を借りるCommand),
typeof(本を返すCommand),
typeof(本を破棄するCommand),
})
.AddCommandHandlers(new[] {
typeof(利用者を登録するCommandHandler),
typeof(本を登録するCommandHandler),
typeof(本を借りるCommandHandler),
typeof(本を返すCommandHandler),
typeof(本を破棄するCommandHandler),
})
.AddQueryHandlers(new[] {
typeof(本DTOQueryHandler),
})
.UseInMemoryReadStoreFor<利用者ReadModel>()
.UseInMemoryReadStoreFor<本ReadModel>()
.UseFilesEventStore(FilesEventStoreConfiguration.Create(@".\store"))
.Configure((EventFlowConfiguration c) =>
{
c.IsAsynchronousSubscribersEnabled = true;
})
.ConfigureJson(json => json
.AddSingleValueObjects()
//.AddConverter<SingleValueObjectConverter>()
//.Configure((JsonSerializerSettings s) => { })
)
.CreateContainer();
return new AutofacServiceProvider(container);
}
それぞれのクラスはアセンブリ名で一気に登録することもできます。(AddDefaults)
まとめ
駆け足での紹介となってしまいましたが雰囲気は伝わったでしょうか。。
とりあえずGitHubに上げたソースでCQRSの一通りの動作を実施できることは確認しました。
気になった方はとりあえず触ってみることをおすすめします。
ドはまりしたところはJsonConverterらへん(特にデシリアライズ)なので想定通りの動作にならない場合はJsonConverterを差し替えて挙動を確認してみると良さげです。