LoginSignup
0
2

More than 1 year has passed since last update.

小規模アプリでのCQRS+イベントソーシングの構成例

Posted at

経緯

CQRS+イベントソーシングをオンプレミスで実装しようとしたときに、
Apache Kafka等のメッセージキューとかを使わずにイベント検知できないかと考えたところ、
PostgreSQLのPub/Sub機能を使えば最小限で出来そうな気がしたので検討アプリを作ってみました。
アプリはここに置いています。

cloneしたら

cd src
docker-compose up -d --build

で起動します。

環境

  • 自作検討アプリ
  • docker-compose
  • mcr.microsoft.com/dotnet/sdk:7.0
  • Microsoft.EntityFrameworkCore Version=7.0.0-preview.7.22376.2
  • postgres:14.5-bullseye

環境イメージ

  • write_model(eventstore用のpostgres)
  • read_model(参照用のpostgres)
  • Projector(read_modelへイベントを射影するdotnetcore)
  • SampleCmd(write_modelへイベントを追加するdotnetcore)

名称未設定ファイル.drawio.png

ファイルツリー抜粋

├── docker-compose.yml
├── dotnet
│   ├── Dockerfile
│   ├── docker-entrypoint.sh
│   └── src
│       ├── Domain
│       │   ├── Domain.csproj
│       │   ├── EventIdEnum.cs
│       │   ├── IEventInsert.cs
│       │   ├── User.cs
│       │   ├── UserId.cs
│       │   ├── UserName.cs
│       ├── Infrastructure
│       │   ├── DataBase
│       │   │   ├── Event.cs
│       │   │   └── EventstoreContext.cs
│       │   ├── EventInsert.cs
│       │   ├── Infrastructure.csproj
│       ├── Projector
│       │   ├── Commands.cs
│       │   ├── DataBase
│       │   │   ├── SampleDBContext.cs
│       │   │   └── User.cs
│       │   ├── Program.cs
│       │   ├── Projector.csproj
│       └── SampleCmd
│           ├── Commands.cs
│           ├── DataBase
│           │   ├── SampleContext.cs
│           │   └── User.cs
│           ├── Program.cs
│           ├── SampleCmd.csproj
├── read_model
│   └── Dockerfile
└── write_model
    ├── Dockerfile
    └── initdb
        ├── eventstore.backup
        └── init.sh

ドメインとかイベントとか

とりあえずただのユーザーテーブルにCRUDするドメインにしてみました。

ドメイン

イベント

EventId 名称 説明 スキーマっぽいもの
1 UserAdded ユーザーを追加する { "type": "object", "properties": { "name": { "type": "string" } } }
2 UserEdited ユーザーを編集する { "type": "object", "properties": { "id": { "type": "string" }, "name": { "type": "string" } } }
3 UserDeleted ユーザーを削除する { "type": "object", "properties": { "id": { "type": "string" } } }

ER図

Projectorのコマンド

データベースリストア用のinitと、イベント購読用のsubscriptionを用意しました。

$ docker exec -it dotnet dotnet run --project /root/src/Projector help
Usage: Projector <Command>

Commands:
  help            Display help.
  init            
  subscription    
  version         Display version.

データベースリストア用のinitの中身抜粋

    // read_modelのSampleデータベースのDbContextをコンストラクタインジェクションしている.
    private SampleDBContext Smp { get; }

    // write_modelのEventstoreデータベースのDbContextをコンストラクタインジェクションしている.
    private EventstoreContext Ev { get; }

    public async Task Init()
    {
        Smp.Database.EnsureDeleted(); // Sampleデータベースの削除
        Smp.Database.EnsureCreated(); // Sampleデータベースの生成

        // イベント分再生
        foreach (var x in Ev.Events)
            Handle(x);
    }

    // イベントid毎に処理.usersテーブルにinsertしたりupdateしたりdeleteしたり.
    private void Handle(Event ev)
    {
        if (Enum.IsDefined(typeof(EventIdEnum), ev.EventId) == false)
            return;

        switch ((EventIdEnum)ev.EventId)
        {
            case EventIdEnum.UserAdded: UserUpsertHandler(ev.Data.RootElement); break;
            case EventIdEnum.UserEdited: UserUpsertHandler(ev.Data.RootElement); break;
            case EventIdEnum.UserDeleted: UserDeleteHandler(ev.Data.RootElement); break;
            default: throw new ArgumentException($"not implemented. {ev.EventId}");
        };
    }

イベント購読用のsubscriptionの中身抜粋

    public async Task Subscription(EventstoreContext ev)
    {
        var conn = (NpgsqlConnection)ev.Database.GetDbConnection();
        conn.Open();

        try
        {
            conn.Notification += OnNotification; 

            // postgresqlのLISTENで購読する.
            using var cmd = new NpgsqlCommand("LISTEN event_channel", conn);
            cmd.ExecuteNonQuery();

            while (Context.CancellationToken.IsCancellationRequested == false)
                await conn.WaitAsync(Context.CancellationToken);
        }
        catch (Exception ex) when (!(ex is OperationCanceledException))
        {
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            conn.Close();
        }
    }

    // イベントが発行される度に処理.
    private void OnNotification(object sender, NpgsqlNotificationEventArgs e)
    {
        Console.WriteLine("event handled: " + e.Payload);
        Event? ev = JsonSerializer.Deserialize<Event>(e.Payload, new JsonSerializerOptions 
        {
            PropertyNameCaseInsensitive = true,
        });

        if (ev is not null)
            Handle(ev);
    }

Eventstoreデータベースのトリガ部分

write_modelのデータベースに仕込みます。

トリガー条件: eventsテーブルにINSERT
実行内容: insertされた内容をNOTIFY

CREATE OR REPLACE FUNCTION public.event_insert_trigger()
    RETURNS trigger
    LANGUAGE 'plpgsql'
    COST 100
    VOLATILE NOT LEAKPROOF
AS $BODY$
BEGIN
PERFORM pg_notify(
	'event_channel',
	row_to_json(NEW)::text
);

return NULL;
END
$BODY$;

ALTER FUNCTION public.event_insert_trigger()
    OWNER TO postgres;

CREATE TRIGGER event_trigger
    AFTER INSERT
    ON public.events
    FOR EACH ROW
    EXECUTE FUNCTION public.event_insert_trigger();

SampleCmdのコマンド

とりあえずコンソールアプリで。addとかeditとかイベント名を指定して必要な引数を付けてイベントをwrite_modelにinsertします。

$ docker exec -it dotnet dotnet run --project /root/src/SampleCmd help
Usage: SampleCmd [options...]

Options:
  -e, --eventname <String>    event name. add or edit or delete (Required)
  -n, --username <String>     user name. Required for add or edit (Default: null)
  -id, --id <String>          guid. Required for edit or delete (Default: null)

Commands:
  help       Display help.
  version    Display version.

まとめ

とりあえず動作しました。
read_modelを増やす予定がないのなら
index.png

のようにひとつのPostgresqlでスキーマで分けてしまえば良い気もします。

Projectorの死活監視が面倒なら
index.png

のようにイベント発行のタイミングでProjectorを外部アプリとして実行するのも有りな気がします。
このパターンでもread_modelをリストアする際にwrite_modelの情報は必要です。

オンプレミスでCQRS+イベントソーシングで実装する際に便利な仕組み等あればご教示願います。

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2