#TL;DR
Dockerを用いて以下の環境を用意し、EventStoreの挙動を確認しイベントソーシングの実現方法について検討しました。
EventStoreSampleをcloneしてdocker-compose up すれば動きます。
詳細な使用方法は↓。
#はじめに
ステートレスアプリケーションでEvent Sourcingするなら、EventStoreを使うとよさそう。仕様をよくみたら、楽観ロック機構もあるようだし実用に耐えれそう。https://t.co/EZu6EsG5VP
— かとじゅん (@j5ik2o) 2019年4月24日
にてEventStoreの存在を知ることができました。ありがとうございました。
#イベントソーシングとは
書籍だと
ネットだと
- 『.NETのエンタープライズアプリケーションアーキテクチャ 第2版』を読んだ(DDD、CQRS、イベントソーシング)
- CQRS+ESについて細かい実装や考察をまとめてみた
- CQRSとイベントソーシングの使用法、または「CRUDに何か問題でも?」
- Cutting Edge - 一般的なアプリケーション向けのイベント ソーシング
- Some thoughts on using CQRS without Event Sourcing
が参考になりました。
自分にはイベントソーシングとは~とは書けないので上記参考願います。
#CQRSとは
CQRS アーキテクチャのスタイル
から引用します。
#イベントストアとは
イベントソーシングを実現するための仕組みで、各集約(エンティティ)毎のイベントを保存して置く場所になります。
実践ドメイン駆動設計や.NETのエンタープライズアプリケーションアーキテクチャを読んだ際にイベントソーシングを実践してみようと思ったのですがこの部分の自作が大変だったので実践できずにいました。
#EventStoreとは
イベントストアを実現してくれるデータベースになります。
ざっくりいうとGuidで一意となる格納場所(ストリームという)にEventDataと呼ばれる情報を追記していきます。
.Net用のAPIがあったので利用しましたが制御はHTTP経由でも出来るのでどの言語でも使用可能です。
Step 1 - Install, run, and write your first event
##EventDataについて
EventData.cs
EventData
にクラス構成が載っていますが以下引用
Member | Description |
---|---|
Guid EventId | A unique identifier representing this event. Event Store uses this for idempotency if you write the same event twice you should use the same identifier both times. |
string Type | The name of the event type. You can use this for pattern matching in projections, so should be a "friendly" name rather than a CLR type name, for example. |
bool IsJson | If the data and metadata fields are serialized as JSON, you should set this to true. Setting this to true will cause the projections framework to attempt to deserialize the data and metadata later. |
byte[] Data | The serialized data representing the event to be stored. |
byte[] Metadata | The serialized data representing metadata about the event to be stored. |
#イベントソーシングを実現するためのアーキテクチャ考察
下図のようなCQRS+ESで考えました。
##コマンド(イベント)
コマンド(イベント)によって変化した集約をEventDataに詰め込む方式について以下のパターンを考えました。
- コマンド(イベント)によって変化した集約をすべてシリアライズ化して追記
- コマンドによって発生するイベント毎に変化点のみをシリアライズして追記
1. コマンド(イベント)によって変化した集約をすべてシリアライズ化して追記
これはDDDで言うところの永続指向のリポジトリで設計している感じです。
すでに永続指向のリポジトリで設計しているならば、save()やput()の実装部をEventStore向けに変更すれば実現できます。
今回はこちらを試しました。
2. コマンドによって発生するイベント毎に変化点のみをシリアライズして追記
これはDDDで言うところのコレクション指向のリポジトリで設計している感じです。
集約のプロパティをSetプロパティで変更したタイミングで変化内容をEventStoreに投げる感じです。
橋渡し役とスナップショット
コマンド(イベント)を介して集約の情報はすべてEventStoreに蓄積されています。
各ストリームの最新のEventDataをGetすれば最新の集約の状態を取得することができますが、ただのbyte[]なので複雑なクエリを実行するのは厳しいです。
また、変化点のみを追記する方式だと最新の状態を取得するためにはストリーム内にある全てのEventDataの情報を畳み込みしないと最新の状態にならない場合があります。
そのためイベントソーシングではスナップショットを行い、クエリはそっちで行うようにします。
スナップショット 【 snapshot 】
EventStoreにはProjectionsという機能があって、多分最新の情報を投影してくれるんでしょうけど未検証です。
結果整合性
EvntStoreとRDBとの同期の方法については
- EventStoreをポーリングして変化点に関わらず全テーブル更新
- EventStoreをポーリングしてある程度EvntDataが溜まってたらテーブル更新
- ストリームをPubSubして逐次テーブル更新
等々考えられますがどの方法でも状態の同期にタイムラグが発生してしまいますので結果整合性についても検討する必要があります。
マイクロサービスにおける 結果整合性との戦い
ざっくり知る「結果整合性(Eventual Consistency)」
EventStoreにはいわゆるPubSubパターンを実現するための仕組みがあるので、今回は橋渡し役は全てのストリームを購読しておき、発行されたらRDBに対してUpsertをかけるようにしました。
EventStoreSampleを用いてEventStoreの挙動解説
ここからは自作したEventStoreSampleを用いてEventStoreの挙動を解説します。
ざっくり構成図
4種5つのコンテナがあります。(ソース)
- eventstore
EventStoreのコンテナです。二つのポート(1113,2113)が必要でホストからも参照できるようにポートを開けています。
http://localhost:2113
でストリームの内容を参照できます。デフォルトのIDとパスワードはadmin/changeit - mariadb
スナップショットを保存するためのDBです。これも一応ポートを開けてます。 - watch
eventstoreとmariadbとの橋渡しになります。コンソールアプリで作成してDaemonにしています。1秒単位でeventstoreのストリーム一覧を監視し、購読していないストリームがあれば購読を開始します。発行されればmariadbに対してupsertします。 - usecase1, usecase2
ユースケース(コマンド)を実行するためのアプリです。二つ用意したのは同時アクセス時の挙動を確認するためです。
ざっくり業務知識
単純なCRUDにしないために以下の業務を考えました。
- ウェブ上から本を借りるシステム(のイメージ)
- 集約は利用者と本。本のタイトルなどの情報は書籍として正規化しました。
- 本を登録する、本を借りる、本を延長する、本を返す、本を破棄する、というユースケースを想定(ソース)
EventStoreSample起動
git clone https://github.com/kwhrkzk/EventStoreSample.git
cd EventStoreSample
docker-compose up -d
docker-compose logs -f watch usecase1 usecase2
docker-compose upだけだとeventstoreやmariadbのログが結構出て見づらいので3つのコンテナに絞ってます。
イメージの取得やアプリのビルド等で時間かかるかもです。
watch | [09,02:33:29.690,DEBUG] TcpPackageConnection: connected to [172.22.0.6:1113, L172.22.0.4:35975, {f06d6710-ab7f-4b0f-a9da-35f1ae5a9fcd}].
usecase2 | ?[?1h?=?[?1h?=You can invoke the tool using the following command: usecase
usecase2 | Tool 'mainapp' (version '1.0.0') was successfully installed.
usecase1 | ?[?1h?=?[?1h?=You can invoke the tool using the following command: usecase
usecase1 | Tool 'mainapp' (version '1.0.0') was successfully installed.
が出たら準備完了です。
ユースケース一覧
docker-compose exec usecase1 usecase
で
初期値入力
本を借りる
本を延長する
本を返す
本を破棄する
ユースケース名が表示されます。上から順に実行していきます。
初期値入力
docker-compose exec usecase1 usecase 初期値入力
でeventstoreに対してイベントを投げています。(ソース)
http://localhost:2113/web/index.html#/streams
を確認すると新しいストリームが表示されているはず。
watchのログには
watch | {"Id":"d6d4f080-dc82-4838-aea0-43ac468c6043","EventNumber":0,"苗字":"田中","名前":"太郎","本一覧":[]}
watch | {"Id":"4dcb1dba-943a-4eb4-aa16-99627680484a","EventNumber":0,"苗字":"山田","名前":"花子","本一覧":[]}
watch | {"Id":"d7ab9958-2921-4b61-a1cf-e16fd994ecd2","EventNumber":0,"書籍EntityId":"a747e426-02a5-4763-b623-5c00b0583b4a","利用者EntityId":null,"版数":1,"貸
出期間自":null,"貸出期間至":null}
watch | {"Id":"a747e426-02a5-4763-b623-5c00b0583b4a","EventNumber":0,"タイトル":".NETのエンタープライズアプリケーションアーキテクチャ 第2版"}
watch | {"Id":"a6726189-d58f-456f-aab1-b11d68c1030c","EventNumber":0,"書籍EntityId":"a747e426-02a5-4763-b623-5c00b0583b4a","利用者EntityId":null,"版数":1,"貸
出期間自":null,"貸出期間至":null}
こんな感じで各エンティティの情報が出るはず。
これはusecase1がEventStoreに対してDataEventをWriteAsyncしたのを、watchが検知してmariadbにinsertした結果です。
EventDataをリードするとEventNumberという情報も手に入ります。これはEventDataをWriteする度に自動で付与される連番です。この情報は楽観ロックの際に使用するのでRDBに保存しておきます。(ソース)
eventstore -> mariadbの流れで情報を登録しないと齟齬が出るので、マイグレーションとかする場合は考慮する必要があるかと。
本を借りる
docker-compose exec usecase1 usecase 本を借りる
docker-compose exec usecase2 usecase 本を借りる
EventStoreには楽観ロックの機能が備わっているのでその確認です。
Optimistic Concurrency & Idempotence
StartTransactionAsyncでトランザクション張ったり、AppendToStreamAsyncで追記したりするときに与える引数でlong expectedVersionというのがあるんですけどここに直近のEventNumberを指定しておくと、値が飛んだ場合にExceptionを発行してくれます。
usecase1 -> usecase2と続けて実行するとusecase1は正常に終了し
watch | {"Id":"4dcb1dba-943a-4eb4-aa16-99627680484a","EventNumber":1,"苗字":"山田","名前":"花子","本一覧":["a6726189-d58f-456f-aab1-b11d68c1030c"]}
watch | {"Id":"a6726189-d58f-456f-aab1-b11d68c1030c","EventNumber":1,"書籍EntityId":"a747e426-02a5-4763-b623-5c00b0583b4a","利用者EntityId":"4dcb1dba-943a-4eb4-aa16-99627680484a","版数":1,"貸出期間自":"2019-05-13T07:26:35.6959350","貸出期間至":"2019-05-27T07:26:35.6959350"}
↑のログが出ますがusecase2の方は
System.InvalidOperationException: Transaction is already committed
と出てロールバックしてくれます。
本を延長する
docker-compose exec usecase1 usecase 本を延長する
watch | {"Id":"a6726189-d58f-456f-aab1-b11d68c1030c","EventNumber":2,"書籍EntityId":"a747e426-02a5-4763-b623-5c00b0583b4a","利用者EntityId":"4dcb1dba-943a-4eb4-aa16-99627680484a","版数":1,"貸出期間自":"2019-05-13T07:26:35.6959350","貸出期間至":"2019-06-10T07:26:35.6959350"}
これは本の履歴を増やすという程度の意味で貸出期間至を2週間延ばしています。
本を返す
docker-compose exec usecase1 usecase 本を返す
watch | {"Id":"4dcb1dba-943a-4eb4-aa16-99627680484a","EventNumber":2,"苗字":"山田","名前":"花子","本一覧":[]}
watch | {"Id":"a6726189-d58f-456f-aab1-b11d68c1030c","EventNumber":3,"書籍EntityId":"a747e426-02a5-4763-b623-5c00b0583b4a","利用者EntityId":null,"版数":1,"貸
出期間自":null,"貸出期間至":null}
本に紐づけていた利用者のIDや貸出期間を除去しています。
本を破棄する
docker-compose exec usecase1 usecase 本を破棄する
どう表現するのが正解かわかりませんが今回はEvntDataの中身を空にしました。(ソース)
watchはEventDataの中身が空の場合は該当するレコードをdeleteするようにしています。(ソース)
これで今回でいうところの"a6726189-d58f-456f-aab1-b11d68c1030c"の集約の一生は終わりになります。
http://localhost:2113
を見てみると
こんな感じでその集約の履歴を参照することができます。
これを解析することでだれがいつ借りたとか何回延長されたとか業務に関する知見を得られそうな気がします。
まとめ
いろいろ前提知識が必要で伝えきれているかわかりませんがイベントソーシングについて実践してみました。
少なくとも永続指向のリポジトリで設計していてCQRSになっているのならばEventStoreへの置き換えはそんなに難しくない気はしました。
また、EventStoreとRDBとの同期部分も各集約毎に
RDB(DAO) <-> 集約 <-> EventData(json)
の対応を取る必要がありますが一度作ってしまえば後はほったらかしでした。
今まではイベントソーシングを実現しようとすると全てがオレオレになっていましたが、EventStoreが登場したことによりイベントソーシングの実現方法について共通の認識で議論できるようになったのではないでしょうか。
余談
usecaseとwatchにはMicroBatchFrameworkを用いました。各ユースケース毎の実行でもDaemon用途でもとても有用でした。
あとUtf8JsonもEventDataの格納がbyte[]だったので相性良かったです。