59
52

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

EventStoreを用いたCQRS+イベントソーシングの実践と考察

Last updated at Posted at 2019-05-13

#TL;DR
Dockerを用いて以下の環境を用意し、EventStoreの挙動を確認しイベントソーシングの実現方法について検討しました。
image.png

EventStoreSampleをcloneしてdocker-compose up すれば動きます。
詳細な使用方法は↓。

#はじめに

にてEventStoreの存在を知ることができました。ありがとうございました。

#イベントソーシングとは
書籍だと

ネットだと

が参考になりました。
自分にはイベントソーシングとは~とは書けないので上記参考願います。

#CQRSとは
CQRS アーキテクチャのスタイル
から引用します。

コマンド クエリ責務分離 (CQRS) は、読み取り操作と書き込み操作を分離するアーキテクチャ スタイルです。
cqrs-logical.svg

#イベントストアとは
イベントソーシングを実現するための仕組みで、各集約(エンティティ)毎のイベントを保存して置く場所になります。
実践ドメイン駆動設計.NETのエンタープライズアプリケーションアーキテクチャを読んだ際にイベントソーシングを実践してみようと思ったのですがこの部分の自作が大変だったので実践できずにいました。

#EventStoreとは
イベントストアを実現してくれるデータベースになります。
ざっくりいうとGuidで一意となる格納場所(ストリームという)にEventDataと呼ばれる情報を追記していきます。

.Net用のAPIがあったので利用しましたが制御はHTTP経由でも出来るのでどの言語でも使用可能です。
Step 1 - Install, run, and write your first event

##EventDataについて
EventData.cs
EventData
にクラス構成が載っていますが以下引用

Member Description
Guid EventId A unique identifier representing this event. Event Store uses this for idempotency if you write the same event twice you should use the same identifier both times.
string Type The name of the event type. You can use this for pattern matching in projections, so should be a "friendly" name rather than a CLR type name, for example.
bool IsJson If the data and metadata fields are serialized as JSON, you should set this to true. Setting this to true will cause the projections framework to attempt to deserialize the data and metadata later.
byte[] Data The serialized data representing the event to be stored.
byte[] Metadata The serialized data representing metadata about the event to be stored.

#イベントソーシングを実現するためのアーキテクチャ考察
下図のようなCQRS+ESで考えました。
image.png

##コマンド(イベント)
コマンド(イベント)によって変化した集約をEventDataに詰め込む方式について以下のパターンを考えました。

  1. コマンド(イベント)によって変化した集約をすべてシリアライズ化して追記
  2. コマンドによって発生するイベント毎に変化点のみをシリアライズして追記

1. コマンド(イベント)によって変化した集約をすべてシリアライズ化して追記

これはDDDで言うところの永続指向のリポジトリで設計している感じです。
すでに永続指向のリポジトリで設計しているならば、save()やput()の実装部をEventStore向けに変更すれば実現できます。
今回はこちらを試しました。

2. コマンドによって発生するイベント毎に変化点のみをシリアライズして追記

これはDDDで言うところのコレクション指向のリポジトリで設計している感じです。
集約のプロパティをSetプロパティで変更したタイミングで変化内容をEventStoreに投げる感じです。

橋渡し役とスナップショット

コマンド(イベント)を介して集約の情報はすべてEventStoreに蓄積されています。
各ストリームの最新のEventDataをGetすれば最新の集約の状態を取得することができますが、ただのbyte[]なので複雑なクエリを実行するのは厳しいです。
また、変化点のみを追記する方式だと最新の状態を取得するためにはストリーム内にある全てのEventDataの情報を畳み込みしないと最新の状態にならない場合があります。
そのためイベントソーシングではスナップショットを行い、クエリはそっちで行うようにします。
スナップショット 【 snapshot 】

EventStoreにはProjectionsという機能があって、多分最新の情報を投影してくれるんでしょうけど未検証です。

結果整合性

EvntStoreとRDBとの同期の方法については

  • EventStoreをポーリングして変化点に関わらず全テーブル更新
  • EventStoreをポーリングしてある程度EvntDataが溜まってたらテーブル更新
  • ストリームをPubSubして逐次テーブル更新

等々考えられますがどの方法でも状態の同期にタイムラグが発生してしまいますので結果整合性についても検討する必要があります。

マイクロサービスにおける 結果整合性との戦い
ざっくり知る「結果整合性(Eventual Consistency)」

EventStoreにはいわゆるPubSubパターンを実現するための仕組みがあるので、今回は橋渡し役は全てのストリームを購読しておき、発行されたらRDBに対してUpsertをかけるようにしました。

EventStoreSampleを用いてEventStoreの挙動解説

ここからは自作したEventStoreSampleを用いてEventStoreの挙動を解説します。

ざっくり構成図

image.png

4種5つのコンテナがあります。(ソース)

  • eventstore

    EventStoreのコンテナです。二つのポート(1113,2113)が必要でホストからも参照できるようにポートを開けています。

    http://localhost:2113
    でストリームの内容を参照できます。デフォルトのIDとパスワードはadmin/changeit
  • mariadb

    スナップショットを保存するためのDBです。これも一応ポートを開けてます。
  • watch

    eventstoreとmariadbとの橋渡しになります。コンソールアプリで作成してDaemonにしています。1秒単位でeventstoreのストリーム一覧を監視し、購読していないストリームがあれば購読を開始します。発行されればmariadbに対してupsertします。
  • usecase1, usecase2

    ユースケース(コマンド)を実行するためのアプリです。二つ用意したのは同時アクセス時の挙動を確認するためです。

ざっくり業務知識

単純なCRUDにしないために以下の業務を考えました。

  • ウェブ上から本を借りるシステム(のイメージ)
  • 集約は利用者。本のタイトルなどの情報は書籍として正規化しました。
  • 本を登録する、本を借りる、本を延長する、本を返す、本を破棄する、というユースケースを想定(ソース)

EventStoreSample起動

git clone https://github.com/kwhrkzk/EventStoreSample.git
cd EventStoreSample
docker-compose up -d
docker-compose logs -f watch usecase1 usecase2

docker-compose upだけだとeventstoreやmariadbのログが結構出て見づらいので3つのコンテナに絞ってます。
イメージの取得やアプリのビルド等で時間かかるかもです。

watch         | [09,02:33:29.690,DEBUG] TcpPackageConnection: connected to [172.22.0.6:1113, L172.22.0.4:35975, {f06d6710-ab7f-4b0f-a9da-35f1ae5a9fcd}].
usecase2      | ?[?1h?=?[?1h?=You can invoke the tool using the following command: usecase
usecase2      | Tool 'mainapp' (version '1.0.0') was successfully installed.
usecase1      | ?[?1h?=?[?1h?=You can invoke the tool using the following command: usecase
usecase1      | Tool 'mainapp' (version '1.0.0') was successfully installed.

が出たら準備完了です。

ユースケース一覧

docker-compose exec usecase1 usecase

初期値入力
本を借りる
本を延長する
本を返す
本を破棄する

ユースケース名が表示されます。上から順に実行していきます。

初期値入力

docker-compose exec usecase1 usecase 初期値入力

でeventstoreに対してイベントを投げています。(ソース)

http://localhost:2113/web/index.html#/streams
を確認すると新しいストリームが表示されているはず。
watchのログには

watch         | {"Id":"d6d4f080-dc82-4838-aea0-43ac468c6043","EventNumber":0,"苗字":"田中","名前":"太郎","本一覧":[]}
watch         | {"Id":"4dcb1dba-943a-4eb4-aa16-99627680484a","EventNumber":0,"苗字":"山田","名前":"花子","本一覧":[]}
watch         | {"Id":"d7ab9958-2921-4b61-a1cf-e16fd994ecd2","EventNumber":0,"書籍EntityId":"a747e426-02a5-4763-b623-5c00b0583b4a","利用者EntityId":null,"版数":1,"貸
出期間自":null,"貸出期間至":null}
watch         | {"Id":"a747e426-02a5-4763-b623-5c00b0583b4a","EventNumber":0,"タイトル":".NETのエンタープライズアプリケーションアーキテクチャ 第2版"}
watch         | {"Id":"a6726189-d58f-456f-aab1-b11d68c1030c","EventNumber":0,"書籍EntityId":"a747e426-02a5-4763-b623-5c00b0583b4a","利用者EntityId":null,"版数":1,"貸
出期間自":null,"貸出期間至":null}

こんな感じで各エンティティの情報が出るはず。
これはusecase1がEventStoreに対してDataEventをWriteAsyncしたのを、watchが検知してmariadbにinsertした結果です。
EventDataをリードするとEventNumberという情報も手に入ります。これはEventDataをWriteする度に自動で付与される連番です。この情報は楽観ロックの際に使用するのでRDBに保存しておきます。(ソース)

eventstore -> mariadbの流れで情報を登録しないと齟齬が出るので、マイグレーションとかする場合は考慮する必要があるかと。

本を借りる

別窓で
docker-compose exec usecase1 usecase 本を借りる
もう一つの窓で
docker-compose exec usecase2 usecase 本を借りる

EventStoreには楽観ロックの機能が備わっているのでその確認です。
Optimistic Concurrency & Idempotence

StartTransactionAsyncでトランザクション張ったり、AppendToStreamAsyncで追記したりするときに与える引数でlong expectedVersionというのがあるんですけどここに直近のEventNumberを指定しておくと、値が飛んだ場合にExceptionを発行してくれます。
usecase1 -> usecase2と続けて実行するとusecase1は正常に終了し

watch         | {"Id":"4dcb1dba-943a-4eb4-aa16-99627680484a","EventNumber":1,"苗字":"山田","名前":"花子","本一覧":["a6726189-d58f-456f-aab1-b11d68c1030c"]}
watch         | {"Id":"a6726189-d58f-456f-aab1-b11d68c1030c","EventNumber":1,"書籍EntityId":"a747e426-02a5-4763-b623-5c00b0583b4a","利用者EntityId":"4dcb1dba-943a-4eb4-aa16-99627680484a","版数":1,"貸出期間自":"2019-05-13T07:26:35.6959350","貸出期間至":"2019-05-27T07:26:35.6959350"}

↑のログが出ますがusecase2の方は

System.InvalidOperationException: Transaction is already committed

と出てロールバックしてくれます。

本を延長する

docker-compose exec usecase1 usecase 本を延長する
watch         | {"Id":"a6726189-d58f-456f-aab1-b11d68c1030c","EventNumber":2,"書籍EntityId":"a747e426-02a5-4763-b623-5c00b0583b4a","利用者EntityId":"4dcb1dba-943a-4eb4-aa16-99627680484a","版数":1,"貸出期間自":"2019-05-13T07:26:35.6959350","貸出期間至":"2019-06-10T07:26:35.6959350"}

これは本の履歴を増やすという程度の意味で貸出期間至を2週間延ばしています。

本を返す

docker-compose exec usecase1 usecase 本を返す
watch         | {"Id":"4dcb1dba-943a-4eb4-aa16-99627680484a","EventNumber":2,"苗字":"山田","名前":"花子","本一覧":[]}
watch         | {"Id":"a6726189-d58f-456f-aab1-b11d68c1030c","EventNumber":3,"書籍EntityId":"a747e426-02a5-4763-b623-5c00b0583b4a","利用者EntityId":null,"版数":1,"貸
出期間自":null,"貸出期間至":null}

本に紐づけていた利用者のIDや貸出期間を除去しています。

本を破棄する

docker-compose exec usecase1 usecase 本を破棄する

どう表現するのが正解かわかりませんが今回はEvntDataの中身を空にしました。(ソース)
watchはEventDataの中身が空の場合は該当するレコードをdeleteするようにしています。(ソース)

これで今回でいうところの"a6726189-d58f-456f-aab1-b11d68c1030c"の集約の一生は終わりになります。

http://localhost:2113
を見てみると

image.png

こんな感じでその集約の履歴を参照することができます。
これを解析することでだれがいつ借りたとか何回延長されたとか業務に関する知見を得られそうな気がします。

まとめ

いろいろ前提知識が必要で伝えきれているかわかりませんがイベントソーシングについて実践してみました。

少なくとも永続指向のリポジトリで設計していてCQRSになっているのならばEventStoreへの置き換えはそんなに難しくない気はしました。
また、EventStoreとRDBとの同期部分も各集約毎に
RDB(DAO) <-> 集約 <-> EventData(json)
の対応を取る必要がありますが一度作ってしまえば後はほったらかしでした。

今まではイベントソーシングを実現しようとすると全てがオレオレになっていましたが、EventStoreが登場したことによりイベントソーシングの実現方法について共通の認識で議論できるようになったのではないでしょうか。

余談

usecaseとwatchにはMicroBatchFrameworkを用いました。各ユースケース毎の実行でもDaemon用途でもとても有用でした。
あとUtf8JsonもEventDataの格納がbyte[]だったので相性良かったです。

59
52
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
59
52

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?