はじめに
社内の勉強会で、関数型プログラミングにおけるパイプラインの説明を聞いたのですが、その内容がCQRS/ESの考え方に似ているように感じました。
そこからさらに思考が発散して、CQRS/ESでイベントを積み上げる様子と、イベントを再生する様子がロケット鉛筆1に似ているように思えたので、Pythonで実装してみました。
CQRS/ESとは
まず、CQRS/ESについて、簡単に説明します。
CQRS
Command Query Responsibility Segregationの略で、コマンド(データの変更)とクエリ(データの取得)を分離するアーキテクチャパターンです。
従来のCRUDモデルでは、読み取りと書き込みで同じモデルを使用することが多い一方、CQRSではこれらを完全に分離し、それぞれに最適化されたモデルを使用します。
メリット
読み取りと書き込みの負荷を独立してスケールできる
デメリット
実装が複雑になりがち
書籍「セキュアバイデザイン」でも、より安全なコードを書く手段として、CQRSについて説明されているので、こちらも是非読んでみてください。
ES
Event Sourcingの略で、システムの状態を直接保存するのではなく、状態を変更するイベントのシーケンスを保存するアプローチです。
状態はイベントログを再生(リプレイ)することで再構築されます。
サンプル実装
以下、PythonでCQRS/ESを実装していきます。原則としてわかりやすさを最優先していますので、一部最適化されていない点もあるかと思いますが、ご了承ください。
イベント
まずはイベントの部分を実装します。
from abc import ABC
class PencilEvent(ABC):
pass
class AddLeadEvent(PencilEvent):
def __init__(self, lead_type: str):
self.lead_type = lead_type
class WriteEvent(PencilEvent):
def __init__(self):
self.is_write = True
Pythonを知らない、または初心者の方向けに。
ABC
は抽象基底クラスのモジュールで、これを継承して抽象クラスを作成します。Pythonではインタフェースや抽象クラスを作成する場合、このモジュールを使用します。(言語仕様上、interface
や abstract class
という表現を使用できません)
リポジトリ
次にリポジトリを実装します。今回は簡易的に実装するため、物理的なデータベースは用意せず、メモリ上にデータを保持するようにしています。
from event import PencilEvent
class PencilEventRepository:
__slots__ = ["__events"]
def __init__(self):
self.__events: list[PencilEvent] = []
def save(self, event: PencilEvent) -> None:
self.__events.append(event)
def all_events(self) -> list[PencilEvent]:
return self.__events.copy()
読み取り用モデル
続いては読み取り用のモデル一式を作成します。モデルはイベントのシーケンスに基づき、生成できるようにします。
from abc import ABC, abstractmethod
from typing import List, Any
# 替芯の基底クラス
class PencilLead(ABC):
@abstractmethod
def write(self) -> None:
pass
class RedPencilLead(PencilLead):
def write(self) -> None:
print("赤い線を描きました")
class BluePencilLead(PencilLead):
def write(self) -> None:
print("青い線を描きました")
class YellowPencilLead(PencilLead):
def write(self) -> None:
print("黄色い線を描きました")
class BlackPencilLead(PencilLead):
def write(self) -> None:
print("黒い線を描きました")
# PencilLeadFactory: 替芯種別ごとに生成クラスを登録し、
# 替芯インスタンスを生成するファクトリ
# (GoFファクトリーメソッド+レジストリパターン)
class PencilLeadFactory:
_registry = {}
@classmethod
def register(cls, lead_type: str, lead_cls):
# 替芯種別(lead_type)と対応するクラス(lead_cls)を
# ファクトリに登録する
cls._registry[lead_type] = lead_cls
@classmethod
def create_lead(cls, lead_type: str) -> PencilLead:
if lead_type in cls._registry:
return cls._registry[lead_type]()
raise ValueError(f"Unknown lead type: {lead_type}")
# 各替芯クラスをファクトリに登録
# (新しい替芯種別追加もここに1行追加するだけでOK)
PencilLeadFactory.register("red", RedPencilLead)
PencilLeadFactory.register("blue", BluePencilLead)
PencilLeadFactory.register("yellow", YellowPencilLead)
PencilLeadFactory.register("black", BlackPencilLead)
# ロケット鉛筆
class RocketPencil:
__STOCK_SIZE: int = 10
def __init__(self):
self.__stock: List[PencilLead] = []
def apply(self, event: Any):
# イベントを適用して状態を再現
if hasattr(event, "lead_type"):
lead = PencilLeadFactory.create_lead(event.lead_type)
# ロケット鉛筆がFIFOの原則で替芯を詰めていく特性の実装
if len(self.__stock) == self.__STOCK_SIZE:
self.__stock.pop(0)
self.__stock.append(lead)
elif hasattr(event, "is_write"):
pass # WriteEventは状態変化なし
# 一定の替芯がないと書くことができない、ロケット鉛筆の特性
def is_writeable(self) -> bool:
return len(self.__stock) == self.__STOCK_SIZE
# 書く処理
def write(self) -> None:
if not self.is_writeable():
print("このロケット鉛筆はまだ書けないよ!")
return
self.__stock[0].write()
イベント再生用のビルダー
モデルが直接リポジトリに依存することへの抵抗があったので、イベントからロケット鉛筆を生成するモジュールを作成しました。
from event_repository import PencilEventRepository
from rocket_pencil import RocketPencil
class RocketPencilReadModelBuilder:
@staticmethod
def build(event_repository: PencilEventRepository) -> RocketPencil:
rocket_pencil = RocketPencil()
for event in event_repository.all_events():
rocket_pencil.apply(event)
return rocket_pencil
メイン処理
イベントを作成して、モデルを再現する一連の処理を書きます。
from event_repository import PencilEventRepository
from event import AddLeadEvent
from model_builder import RocketPencilReadModelBuilder
def main():
# イベントリポジトリを作成
event_repo = PencilEventRepository()
# ユーザー操作の例:替芯を追加(コマンド)
event_repo.save(AddLeadEvent("red"))
event_repo.save(AddLeadEvent("blue"))
event_repo.save(AddLeadEvent("yellow"))
for _ in range(7):
event_repo.save(AddLeadEvent("black"))
event_repo.save(AddLeadEvent("red"))
# モデルをイベント履歴から再構築
rocket_pencil = RocketPencilReadModelBuilder.build(event_repo)
# 書く処理を実行
rocket_pencil.write()
if __name__ == "__main__":
main()
処理実行
main.py
を実行すると、以下の結果が得られます。
% python main.py
青い線を描きました
ロケット鉛筆がきちんとFIFOで芯を補充している、すなわちイベントをもとに状態を構築できました。
最後に
今回はCQRS/ESのイメージをすべてPythonで実装してみましたが、実際のシステムに適用する場合は、イベントの送信をAWSではAmazon SNSやAmazon SQS、AzureではAzure Event Grid、Google CloudではCloud Pub/Subなどを使用すると良いでしょう。
-
複数の芯をストックし、FIFO(First-In, First-Out)の原則で動作するQueueを物理的に再現した筆記具。 ↩