はじめに
の続きです。
EventFlowのReadModelStoreをPostgreSQLに変更するための方法について投稿します。
そもそも
EventFlowではReadModelStoreを変更することができます。
ドキュメントから引用すると
- In-memory
- Microsoft SQL Server
- Elasticsearch
が書かれていますがPostgreSQL用の拡張がNugetにあったのでそちらを試してみます。
CQRS+ES and DDD frameworkのEventFlowの紹介で使用しているソースを元に説明します。
Nuget取得
ReadModel実装
PostgreSqlReadModel)が定義されているのでこれを継承して作ります。
[Table("ReadModel-利用者")]
public class 利用者ReadModelForPostgres : PostgreSqlReadModel, IAmReadModelFor<利用者, 利用者のID, 利用者を登録した>
{
public string 利用者のID { get; set; }
public string 氏名 { get; set; }
public 利用者ReadModelForPostgres() :base() { }
public void Apply(IReadModelContext context, IDomainEvent<利用者, 利用者のID, 利用者を登録した> domainEvent)
{
利用者のID = domainEvent.AggregateIdentity.Value;
氏名 = domainEvent.AggregateEvent.氏名.Value;
UpdatedTime = DateTimeOffset.Now;
CreateTime = DateTimeOffset.Now;
}
public 利用者 To利用者()
=> new 利用者(Domain.利用者.利用者のID.With(利用者のID))
{
氏名 = Domain.利用者.氏名.Create(氏名),
};
}
By convention, EventFlow uses the table named ReadModel-[CLASS NAME] as the table to store the read models rows in. If you need to change this, use the Table from the System.ComponentModel.DataAnnotations.Schema namespace.
引用の通りテーブル名はReadModel-[CLASS NAME]がデフォルトっぽいですがTable属性で変更できます。ReadModel-[集約名]にしときました。
PostgreSqlReadModelはIReadModelを継承しているのでIn-memoryで動作させる場合でも使えます。
ただIn-memoryの場合は
public 利用者のID 利用者のID { get; set; }
のようにValueObjectを型にしても変換してくれたのですが、PostgreSQLの場合には変換エラーが発生してしまいました。
なのでプロパティの型はPostgreSQLの型と相互変換出来るようなプリミティブな型にする必要がありました。
使用する際はValueObjectを返すGetterのプロパティを用意するか、集約に変換できるならばTo利用者のように集約に変換するメソッドを用意して上げた方がよさそうです。
Query Handler実装
In-memoryの場合は組み込まれているIInMemoryReadStoreを使用して取得していましたがPostgreSQLではIPostgreSqlConnectionを使用する必要があります。
なのでIn-memoryとDB両方に対応するのならば
- ひとつのQuery Handler内でIn-memoryかDBか見極めて使用するインスタンスを変える
- それぞれ用のQuery Handlerを作成し、初期設定で差し替える
のどちらかをする必要があります。今回は後者で行いました。
public class 本DTOQueryHandlerForPostgres : IQueryHandler<本DTOQuery, IReadOnlyCollection<本DTO>>
{
private IPostgreSqlConnection PostgreSqlConnection { get; }
public 本DTOQueryHandlerForPostgres(IPostgreSqlConnection _postgreSqlConnection) => PostgreSqlConnection = _postgreSqlConnection;
public async Task<IReadOnlyCollection<本DTO>> ExecuteQueryAsync(本DTOQuery query, CancellationToken cancellationToken)
{
var 本Model = (await PostgreSqlConnection.QueryAsync<本ReadModelForPostgres>(Label.Named("postgresql-本ReadModel"), cancellationToken, "SELECT * FROM \"ReadModel-本\"")).Select(m => m.To本());
var 利用者Model = (await PostgreSqlConnection.QueryAsync<利用者ReadModelForPostgres>(Label.Named("postgresql-利用者ReadModel"), cancellationToken, "SELECT * FROM \"ReadModel-利用者\"")).Select(m => m.To利用者());
var list = from 本 in 本Model
join _利用者 in 利用者Model on 本.利用者のID equals _利用者.Id into 利用者Join
from 利用者 in 利用者Join.DefaultIfEmpty()
select new 本DTO { 本のID = 本.Id, 本のタイトル = 本.本のタイトル, 利用者のID = 本?.利用者のID, 貸出期間 = 本.貸出期間, 氏名 = 利用者?.氏名 };
return list.ToList().AsReadOnly();
}
}
ポイントはQueryAsyncです。
Task<IReadOnlyCollection<TResult>> QueryAsync<TResult>(Label label, CancellationToken cancellationToken, string sql, object param = null);
第三引数のsqlで直接必要な情報を取得するでも良いですし、さらっと取得してLINQ to Objectsで取得もできます。
初期設定
IContainer container = EventFlowOptions.New
.UseAutofacContainerBuilder(containerBuilder) // Must be the first line!
~
~
.AddQueryHandlers(new[] {
typeof(本DTOQueryHandlerForPostgres),
typeof(利用者AllQueryHandlerForPostgres),
})
.ConfigurePostgreSql(PostgreSqlConfiguration.New.SetConnectionString("Server=localhost;Port=5432;User ID=postgres;Database=eventflow;password=password;Enlist=true"))
.UsePostgreSqlReadModel<利用者ReadModelForPostgres>()
.UsePostgreSqlReadModel<本ReadModelForPostgres>()
.CreateContainer();
前述した通りAddQueryHandlersに今回の設定で使う方のQuery Handlerを登録します。
ConfigurePostgreSqlで必要な情報を入力します。
UsePostgreSqlReadModelでPostgreSQL用のReadModelを登録します。
この後にマイグレーション処理が必要になります。
var databaseMigrator = container.Resolve<IPostgreSqlDatabaseMigrator>();
//EventFlowSnapshotStoresPostgreSql.MigrateDatabase(databaseMigrator);
//EventFlowEventStoresPostgreSql.MigrateDatabase(databaseMigrator);
databaseMigrator.MigrateDatabaseUsingScripts(new[]
{
new SqlScript(
"利用者ReadModel",
@"CREATE TABLE IF NOT EXISTS ""ReadModel-利用者""(
利用者のID Varchar(64),
氏名 Varchar(64),
-- -------------------------------------------------
Id bigint GENERATED BY DEFAULT AS IDENTITY,
AggregateId Varchar(64) NOT NULL,
CreateTime Timestamp WITH TIME ZONE NOT NULL,
UpdatedTime Timestamp WITH TIME ZONE NOT NULL,
LastAggregateSequenceNumber int NOT NULL,
CONSTRAINT ""PK_ReadModel-利用者"" PRIMARY KEY
(
Id
)
);
CREATE INDEX IF NOT EXISTS ""IX_ReadModel-利用者_AggregateId"" ON ""ReadModel-利用者""
(
AggregateId
);
"),
});
}
どうにかしてIPostgreSqlDatabaseMigratorを取得したらReadModel用のテーブルをCreateするDDLを用意します。
この記述は外部ファイル化できます。(参考)
---以下はReadModel-~~のところだけ直すだけで良かったです。
---より上にReadModelに定義してあるプロパティに対応する列を定義します。
ReadModelのプロパティにPostgreSqlReadModelIgnoreColumnAttributeとか設定すればプロパティを無視できたりします。
Database生成
自分は手動でDB(今回だとDB名:eventflow)を作成してからテストしましたがコード上でDBの生成とかできそうです。(参考)
まとめ
PostgreSQLはReadModelStoreの他にもEventStoreやSnapshots用にも使えます。
また他のDBにも対応できるNugetがありますので好みでDBを選べそうです。
DDLを自分で記述しないといけない点やReadModelでのプロパティの型がプリミティブなのがちょっと気になりました。
EventFlow.EntityFrameworkを組み合わせれば気にしなくて良くなるかも?