経緯
CQRS+イベントソーシングをオンプレミスで実装しようとしたときに、
Apache Kafka等のメッセージキューとかを使わずにイベント検知できないかと考えたところ、
PostgreSQLのPub/Sub機能を使えば最小限で出来そうな気がしたので検討アプリを作ってみました。
アプリはここに置いています。
cloneしたら
cd src
docker-compose up -d --build
で起動します。
環境
- 自作検討アプリ
- docker-compose
- mcr.microsoft.com/dotnet/sdk:7.0
- Microsoft.EntityFrameworkCore Version=7.0.0-preview.7.22376.2
- postgres:14.5-bullseye
環境イメージ
- write_model(eventstore用のpostgres)
- read_model(参照用のpostgres)
- Projector(read_modelへイベントを射影するdotnetcore)
- SampleCmd(write_modelへイベントを追加するdotnetcore)
ファイルツリー抜粋
├── docker-compose.yml
├── dotnet
│ ├── Dockerfile
│ ├── docker-entrypoint.sh
│ └── src
│ ├── Domain
│ │ ├── Domain.csproj
│ │ ├── EventIdEnum.cs
│ │ ├── IEventInsert.cs
│ │ ├── User.cs
│ │ ├── UserId.cs
│ │ ├── UserName.cs
│ ├── Infrastructure
│ │ ├── DataBase
│ │ │ ├── Event.cs
│ │ │ └── EventstoreContext.cs
│ │ ├── EventInsert.cs
│ │ ├── Infrastructure.csproj
│ ├── Projector
│ │ ├── Commands.cs
│ │ ├── DataBase
│ │ │ ├── SampleDBContext.cs
│ │ │ └── User.cs
│ │ ├── Program.cs
│ │ ├── Projector.csproj
│ └── SampleCmd
│ ├── Commands.cs
│ ├── DataBase
│ │ ├── SampleContext.cs
│ │ └── User.cs
│ ├── Program.cs
│ ├── SampleCmd.csproj
├── read_model
│ └── Dockerfile
└── write_model
├── Dockerfile
└── initdb
├── eventstore.backup
└── init.sh
ドメインとかイベントとか
とりあえずただのユーザーテーブルにCRUDするドメインにしてみました。
ドメイン
イベント
EventId | 名称 | 説明 | スキーマっぽいもの |
---|---|---|---|
1 | UserAdded | ユーザーを追加する | { "type": "object", "properties": { "name": { "type": "string" } } } |
2 | UserEdited | ユーザーを編集する | { "type": "object", "properties": { "id": { "type": "string" }, "name": { "type": "string" } } } |
3 | UserDeleted | ユーザーを削除する | { "type": "object", "properties": { "id": { "type": "string" } } } |
ER図
Projectorのコマンド
データベースリストア用のinitと、イベント購読用のsubscriptionを用意しました。
$ docker exec -it dotnet dotnet run --project /root/src/Projector help
Usage: Projector <Command>
Commands:
help Display help.
init
subscription
version Display version.
データベースリストア用のinitの中身抜粋
// read_modelのSampleデータベースのDbContextをコンストラクタインジェクションしている.
private SampleDBContext Smp { get; }
// write_modelのEventstoreデータベースのDbContextをコンストラクタインジェクションしている.
private EventstoreContext Ev { get; }
public async Task Init()
{
Smp.Database.EnsureDeleted(); // Sampleデータベースの削除
Smp.Database.EnsureCreated(); // Sampleデータベースの生成
// イベント分再生
foreach (var x in Ev.Events)
Handle(x);
}
// イベントid毎に処理.usersテーブルにinsertしたりupdateしたりdeleteしたり.
private void Handle(Event ev)
{
if (Enum.IsDefined(typeof(EventIdEnum), ev.EventId) == false)
return;
switch ((EventIdEnum)ev.EventId)
{
case EventIdEnum.UserAdded: UserUpsertHandler(ev.Data.RootElement); break;
case EventIdEnum.UserEdited: UserUpsertHandler(ev.Data.RootElement); break;
case EventIdEnum.UserDeleted: UserDeleteHandler(ev.Data.RootElement); break;
default: throw new ArgumentException($"not implemented. {ev.EventId}");
};
}
イベント購読用のsubscriptionの中身抜粋
public async Task Subscription(EventstoreContext ev)
{
var conn = (NpgsqlConnection)ev.Database.GetDbConnection();
conn.Open();
try
{
conn.Notification += OnNotification;
// postgresqlのLISTENで購読する.
using var cmd = new NpgsqlCommand("LISTEN event_channel", conn);
cmd.ExecuteNonQuery();
while (Context.CancellationToken.IsCancellationRequested == false)
await conn.WaitAsync(Context.CancellationToken);
}
catch (Exception ex) when (!(ex is OperationCanceledException))
{
Console.WriteLine(ex.ToString());
}
finally
{
conn.Close();
}
}
// イベントが発行される度に処理.
private void OnNotification(object sender, NpgsqlNotificationEventArgs e)
{
Console.WriteLine("event handled: " + e.Payload);
Event? ev = JsonSerializer.Deserialize<Event>(e.Payload, new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
});
if (ev is not null)
Handle(ev);
}
Eventstoreデータベースのトリガ部分
write_modelのデータベースに仕込みます。
トリガー条件: eventsテーブルにINSERT
実行内容: insertされた内容をNOTIFY
CREATE OR REPLACE FUNCTION public.event_insert_trigger()
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS $BODY$
BEGIN
PERFORM pg_notify(
'event_channel',
row_to_json(NEW)::text
);
return NULL;
END
$BODY$;
ALTER FUNCTION public.event_insert_trigger()
OWNER TO postgres;
CREATE TRIGGER event_trigger
AFTER INSERT
ON public.events
FOR EACH ROW
EXECUTE FUNCTION public.event_insert_trigger();
SampleCmdのコマンド
とりあえずコンソールアプリで。addとかeditとかイベント名を指定して必要な引数を付けてイベントをwrite_modelにinsertします。
$ docker exec -it dotnet dotnet run --project /root/src/SampleCmd help
Usage: SampleCmd [options...]
Options:
-e, --eventname <String> event name. add or edit or delete (Required)
-n, --username <String> user name. Required for add or edit (Default: null)
-id, --id <String> guid. Required for edit or delete (Default: null)
Commands:
help Display help.
version Display version.
まとめ
とりあえず動作しました。
read_modelを増やす予定がないのなら
のようにひとつのPostgresqlでスキーマで分けてしまえば良い気もします。
のようにイベント発行のタイミングでProjectorを外部アプリとして実行するのも有りな気がします。
このパターンでもread_modelをリストアする際にwrite_modelの情報は必要です。
オンプレミスでCQRS+イベントソーシングで実装する際に便利な仕組み等あればご教示願います。