EventSourcing + CQRSをServerlessで
近年Microservices界隈でEvent駆動のシステムを構築するケースが増えてきています。
以前MicroService間のトランザクションについて記事を投稿しましたが、
本投稿では、Microserviceのローカルトランザクションと一貫性について記載したいと思います。
※ 本記事に関するものをGithubにサンプルを公開しています。
概要
eコマースでは、ユーザが商品購入の際カートに商品を登録しますが、商品の在庫が残っているときだけ成功させたいということがあります。
Event駆動システムだと"ユーザが商品をカートに入れる"というイベントを在庫がある時だけEventStoreに永続化するということになるでしょう。
このユースケースを複数のユーザが同時に行うために、Microserviceはトランザクション処理と一貫性の対応が必要になります。
以下にEventSourcing + CQRSの基本と、この要件を実現する在庫管理(Inventory)サービスのローカルトランザクションと一貫性について説明していきたいと思います。
EventSourcing + CQRS
Event Sourcing + CQRSでは以下のような仕組みで、高トラフィックでも一貫性とトランザクションを実現します。
- すべてEventの永続化(EventStore)
- 楽観的ロックによるデータベース整合性担保
- 楽観的ロックによる複数のトランザクション処理
- 最新のプロジェクション(EntityやViewの最新状態の取得)
また、EventSourcing+CQRSは以下の方針により低レイテンシーとスケーラビリティを実現します。
- データベースのロックは行わない
- UPDATEを行わない
- 非同期処理
Amazon DynamoDB
Event Sourcing + CQRSはデータベース周りの説明が主体になります。
本投稿ではDynamoDBを使用しますが関係する機能は以下になります。
- 楽観的ロック(EventStore)
- DynamoDB Streams(Snapshot)
- 強整合性読み取り(Query)
- 条件付き書き込み(Deduplication)
また、本投稿では上記機能を使いレコードのUpdateを一切行わないという方針で設計しています。
在庫管理サービス(Inventory)の構成
eコマースのInventoryに関するイベントは以下のようなものがあると想定します。
- 在庫追加 (stock_add: 在庫管理者が仕入れた在庫を追加する)
- 商品予約 (item_reserve: ユーザが商品をカートに入れる)
- 商品予約完了 (item_reserve_complete: ユーザが購入する商品を確定する)
- 予約キャンセル(item_reserve: ユーザがカートから商品を削除)
まず、ES+CQRSの基本を"在庫追加イベント"の例を元に以下の構成で説明します。
- EventStore (Inventoryサービスのイベント、コマンドをすべて保存します)
- Snapshot (在庫状態を集計したsnapshotを保存します)
- Deduplication (重複排除のためのロックマネージャ)
- Query (最新の商品在庫状態を取得)
Command: 在庫管理者が仕入れた在庫を追加する際発生するCommand
EventStore: すべてのCommand,Eventを永続化した状況
Snapshot: EventStoreに保存されたCommandをDynamoDB Streamsで受け取ったもの
Deduplication: Snapshotが重複排除に使用するテーブル
EventStore
EventStoreでは、Inventoryのデータベースの変更に関するイベントをすべて永続化するというのが基本になります。
EventStoreにEventを永続化するときに、バージョン番号を使用したオプティミスティックロックを使います。
参照: バージョン番号を使用したオプティミスティックロック
オプティミスティックロックにより同時に複数のユーザの予約処理をさせないようにすることやデータベースの整合性を保持します。
Githubサンプルでは
①EvnetStoreからitem_idの最新のversionを取得し、
②新しいversion(version + 1)で条件付き追加します。
@retry(wait_exponential_multiplier=100,
wait_exponential_max=1000,
retry_on_exception=is_integrity_error)
def __persist_with_optimistic_lock(self):
latest_event = self.__es.get_latest_event() # ①
self.__es.persist(latest_event['version'])
def get_latest_event(self):
try:
event = self.__query_latest_event()
return event.attribute_values
except ItemDoesNotExist:
return self.__get_initial_event()
def persist(self, current_version):
self.__event['version'] = current_version + 1 # ②
self.__event['saved_at'] = str(datetime.datetime.utcnow())
item = self.__model(**self.__event)
with raise_with_no_item_exception(self.__event['item_id']):
item.save( # ③
condition=(self.__model.item_id != self.__event['item_id']) &
(self.__model.version != self.__event['version'])
)
(注) Dynamodbのapiとしてpynamodbを使用しています。
(注) リトライ処理にretryingを使用しています。
Snapshot
在庫状態を集計したsnapshotを保存します。
EventStoreに保存されたレコードをDynamoDB Streamsで通知させSnapshotを作成します。
DynamoDB Streamsで通知されたINSERTのImageを受付け、在庫の状態(State)を計算しItemを追加します。
Stateは、{"available": 10, "reserved": 0, "bought": 0}のように畳み込みで計算して作成します。
(DynamoDB Streamsは以下のようにBatchSizeを1としているので、1レコードづつ計算しています。)
また、from_versionというattributeがありますが、これは計算が完了したEventStoreのレコードのversionを示しています。
EventStoreStream:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: 1
Enabled: True
EventSourceArn: !GetAtt EventStore.StreamArn
FunctionName: !GetAtt SnapshotFunction.Arn
StartingPosition: TRIM_HORIZON
畳み込み処理は、event_type毎に以下のように計算しています。
class State:
__initial_state = {
'available': 0,
'reserved': 0,
'bought': 0
}
def apply(self, current_state, next_event):
event_type = self.__get_event_type(next_event)
handler = getattr(self, '_on_{}'.format(event_type))
if handler:
return handler(current_state, next_event)
return self.__initial_state
@staticmethod
def _on_add(state, event):
state['available'] += event['quantity']
return state
@staticmethod
def _on_reserve(state, event):
state['available'] -= event['quantity']
state['reserved'] += event['quantity']
return state
@staticmethod
def _on_complete(state, event):
state['reserved'] -= event['quantity']
state['bought'] += event['quantity']
return state
@staticmethod
def _on_cancel(state, event):
state['available'] += event['quantity']
state['reserved'] -= event['quantity']
return state
DynamoDB Streamsを使う理由
DynamoDB Streamsがなかった時代は、プログラムではEventStoreに保存したあと更新の通知(publish)していたと思います。
プログラムではput_item(),publish()という処理にしますが、これだとAtomicな処理になりません。
途中で例外が発生してしまうとpublish()がされず、DB更新と通知の一貫性がなくなってしまいEvent駆動なシステムでは致命的になります。
DynamoDBがSnapshot作成に最適なのはレコード作成/更新/削除を非同期に"Exactly Once"で通知してくれるからです。
また、サンプルではUpdateさせないという方針としているのでSnapshotもオプティミスティックロックで行っていますが、
DynamoDB Streamsは"Exactly Once"でシーケンシャルに処理されるので、SnapshotのレコードをUPDATEするのでも問題ないと思います。
Deduplication 重複排除
イベント駆動のシステムでは通常"At Least Once"のイベント配信であり、
EventStoreでは同じEventが重複することがあります。このような"At Least Once"では
読み取り側で重複を除く処理が必要になります。
Deduplicationテーブルは重複排除のためのロックマネージャとして機能させます。
EventのEventIDを条件付き書き込みを使用して重複を検知し重複を検知したEventは処理をしません。
def is_duplicate_event(self):
try:
deduplicate = self.model(
self.event['event_id'] + self.__item_suffix,
0)
with raise_for_save_exception(self.event['event_id']):
deduplicate.save( # ①
condition=(self.model.item_id != self.event_id) &
(self.model.version != 0)
)
return False
except IntegrityError:
logger.info('is_duplicate_event: {}'.format(self.event))
return True
Query
以上のようにEventの永続化が行われた状況の中で表の例を元に最新の商品在庫状態を取得する処理に関して説明します。
Command表では、item_id: '00000001'の在庫を(quantity: 10個 20個, 30個)の3回追加しています。
EventStoreには(version: 1, 2, 3)3つのレコードが保存されています。
Snapshotには、(version: 1, 2)の2つのレコードが保存されています。
(Snapshotは非同期に作成されるので3レコードがまだSnapshotに届いていないという状況になります。)
この状態のときにitem_idのstateをQueryするとします。
①snapshotの最新のレコードを取得します。
②EventStoreからSnapshotに反映されていないレコードを取得します。(すべてのfrom_version以降のレコード)
③snapshotのstateを初期値とし、②で取得したレコードのstateを畳み込みで計算します。
def __get_latest_state(self):
snapshot = self.__ss.get_snapshot() # ①
events = self.__es.get_events_from(snapshot['from_version']) # ②
if len(events):
state = reduce(self.__ss.calculate_state, # ③
events,
self.__ss.get_state(snapshot))
current_version = events[-1]['version']
return state, current_version
else:
return snapshot['state'], snapshot['from_version']
最新のレコードを取得する際、効率的にqueryするためにScanIndexForwardを降順でlimitを1で取得します。
snapshot = self.__model.query(snapshot_item_id,
self.__model.version > 0,
limit=1,
scan_index_forward=False)
また、②の処理でEventStoreから反映されていないレコードを取得する際、④強力な整合性のある読み込み
でqueryします。これによりEventStoreの最新の状態が取得できるようになります。
def get_events_from(self, from_version):
with raise_for_query_exception(self.__request['item_id']):
event_list = list()
for event in self.__model.query(
self.__request['item_id'],
self.__model.version > from_version,
consistent_read=True, # ④
scan_index_forward=False):
event_list.append(event.attribute_values)
event_list.reverse()
return event_list
以上説明した処理により、表CurrentStateが取得できるようになります。
Reserveの課題
以上のように、EventStore+CQRSの基本を説明しましたが、
「ユーザがカートに商品を登録する際、商品の在庫が残っているときだけ成功させる」
という要件をどうするか説明します。
シナリオ
-
ユーザが商品をカートに入れる。(item reserve)
-
購入する商品を確定し(item reserve complete)
-
支払い処理をする
-
3.の段階で、ユーザに「やっぱり在庫はありませんでした」というシナリオは問題になりますので、
1の段階で在庫の確認を行ってから商品の予約が完了するような処理が必要です。
前記で"在庫追加(stock_add)"をオプティミスティックロックでEventStoreにレコード追加しました。
①EvnetStoreからitem_idの最新のversionを取得し、
②新しいversion(version + 1)で条件付き追加します。
商品予約(item_reserve: quantity=3)の場合、
①SnapshotとEventStoreからitem_idの最新のversionとstateを取得し
②state({"available": 30, "reserved": 0, "bought": 0})のavailableとquantityを比較し
商品予約が可能かを判断し
②商品予約を新しいversion(version + 1)で条件付き追加します。
(②でavailableが足りず予約不可能の場合は③の処理を行いません。エラーはGraphQLなどでイベントを返します)
@retry(wait_exponential_multiplier=100,
wait_exponential_max=1000,
retry_on_exception=is_not_item_ran_short)
def __persist_with_check_stock(self):
state, current_version = self.__get_latest_state() # ①
if self.__is_item_available(state): # ②
self.__es.persist(current_version) # ③
else:
raise ItemRanShort
このように、商品予約もEventStoreのオプティミスティックロックによる保存で実現します。
前記Inventoryのイベントを一覧しましたが、上記のような最新のstateとの比較判断が必要なのは
bの商品予約だけで、a,c,dはそのままEventStoreに保存するので問題ないと思います。
a. 在庫追加(在庫管理者が仕入れた在庫を追加する)
b. 商品予約(ユーザが商品をカートに入れる)
c. 商品予約完了(ユーザが購入する商品を確定する)
d 予約キャンセル(ユーザがカートから商品を削除)
以下に別のパターンのイベントの例示しておきます。
stock_add: 在庫の追加
item_reserve: ユーザがカートに商品を追加すると発生するコマンド
item_reserve_complete: ユーザが商品購入を確定させると発生するコマンド
item_reserve_cancel: ユーザがカートから商品を削除したり、購入を取りやめると発生するコマンド
Single Table
今回はDynamoDBで推奨しているSingle Tableで構成しましたが、別々のTableでも構わないと思います。
Single Tableにしていることにより、SnapshotやDeduplicationのItem追加で必要のないDynamoDB Streamsが発生してしまいます。
item_idを以下のように分けています。(ちょっとわかりにくかったかもしれません。)
EventStore: 00000001
Snapshot: 00000001-snapshot
Deduplication: 00000001-deduplication
Query API
ここでは説明しませんが、GithubサンプルにはQueryAPIもありますので、コードを参照ください。
考察
以上のようにEventSoucing+CQRSをDynamoDBとLambdaで作成しましたが、
DynamoDBは、Event駆動のデータベースとしては最適なものだと思いました。
むしろDynamoDBはEvent駆動のために進化しているだと思えたほどです。