CQRSにおけるメッセージブローカーの役割
コマンドモデルとリードモデルを非同期で連携させる場合、その間のメッセージブローカーは、プロジェクターが 「利用(購読)する」 独立したインフラコンポーネントです。
「プロジェクター」と「メッセージブローカー」は、責務が明確に分離されています。
メッセージブローカー (例: Kafka, RabbitMQ)
責務
コマンド側から発行されたイベント(メッセージ)を安全に受信し、
「内部にためておく(バッファ)」、そして購読者に配送する。
役割
独立した 「インフラコンポーネント」 です。
プロジェクター (Projector)
責務
メッセージブローカーを「購読(Consume)」し、
受け取ったイベントを解釈・変換して、リードDBに書き込む。
役割
「アプリケーション」 コンポーネント(イベント処理ロジック)です。
郵便局のアナロジー 📬
このメカニズムは、「郵便局」に例えると分かりやすいです。
メッセージブローカー(インフラ)
街の 「郵便局(インフラ)」 そのものです。
その責務は、手紙(イベントメッセージ)を
「受け取り」、「仕分け」し、「安全に保管(バッファ)」し、
受信者(プロジェクターなど)に「確実に配送」
することです。
この郵便局員は、手紙の中身(ビジネスロジック) を読んだり、解釈したり、変更したりはしません。
プロジェクター(アプリケーション)
郵便局で私書箱を契約している 「手紙の受信者(アプリケーション)」 です。
受信者(プロジェクター)は、郵便局(ブローカー)を インフラ基盤として保有したりはしません。
郵便局という独立したインフラを「利用」して、手紙(イベント)を受け取っているだけ。
手紙を受け取った後、
その中身(イベントデータ)を「解釈」し、「リードモデルを更新する」といったアプリケーション固有の役割(ビジネスロジック)を実行
します。
アーキテクチャ図
この関係はアーキテクチャ図で明確になります。
メッセージブローカーは、コマンド側とプロジェクターの 「中間」 に位置する、独立したインフラです。
この分離こそが、非同期連携の最大のメリット(疎結合)を生み出します。
最悪プロジェクターが停止しても、メッセージブローカーがイベントを保持し続け、
コマンド側は一切影響を受けずに動作を継続できます。(爆発半径の局所化成功)
なぜ分離するのか(単一責任の原則)
この分離は、単一責任の原則 (SRP) に従っています。
メッセージブローカーの責任
メッセージの 「配送(Delivery)」というインフラの責務 に集中します。
アプリケーションの責任
メッセージの 「解釈(Interpretation)」と「処理(Processing)」というビジネスの責務 に集中します。
この単一責務に反すると...
もしメッセージブローカーが「アプリ的役割」まで持ち始めたら(例:ブローカー内部でデータ変換を行う)、この責務が曖昧になり、保守性が著しく低下します。
結論として、メッセージブローカーは「メッセージを運ぶ」ことに特化したインフラであり、そのメッセージが何を意味するかは関知しない、というのが基本的な設計思想です。
プロジェクターのインフラコンポーネント
プロジェクターはアプリケーションコンポーネントですが、それが機能するためにはいくつかのインフラ基盤に依存し、それを利用します。
プロジェクターがインフラ基盤に「持つ」あるいは「必要とする」ものは、主に以下の3種類です。
1. 🚀 コンピュート基盤 (Compute Infrastructure)
「プロジェクターのコードをどこで実行するか」 という基盤です。
コンテナ (Kubernetes Pod, ECS Taskなど)
最も一般的です。
プロジェクターは、イベントを常に待ち受ける常駐プロセスとしてコンテナ内で実行されます。
サーバーレス (FaaS - AWS Lambdaなど)
これも非常に一般的です。
メッセージブローカーがイベントを受信したことをトリガーとして、サーバーレス関数(プロジェクターのロジック)が実行されます。
仮想マシン (VM)
伝統的なアプローチとして、VM上でデーモンプロセスとして実行することもあります。
2. 🌐 ネットワーク基盤 (Networking Infrastructure)
プロジェクターは、メッセージブローカーと 「通信」 するためにネットワーク基盤を必要とします。
これは、プロジェクターが「保有する」特別なコンポーネントではなく、
「利用する」クラウドのネットワーク機能(VPC、サブネット)と
「アクセス許可(Security Groups, Network Policies)」
のことです。
プロジェクターは、主に以下の2つの方向への通信基盤(ネットワークアクセス)を必要とします。
①. 入力側
メッセージブローカー(Kafka, SQSなど)に接続し、イベントを受信(Consume) するため。
②. 出力側
リードモデルDB(PostgreSQL, Elasticsearchなど)に接続し、データを書き込む(Produce) ため。
3. 💾 状態管理基盤 (State Store)
堅牢なプロジェクターを構築する場合、「自分自身の状態」 を記録するためのインフラ(データベース)も必要になります。
ここは、イベントソーシングのような実装は不要で、「現在の状態」を記録・更新できていれば十分です。
目的
・冪等性(Idempotency)の確保
「どのイベントIDを既に処理したか」を記録し、同じイベントを二重処理しないようにするため。
・オフセット/カーソルの管理
メッセージブローカーの「どこまで読み取ったか」を永続化し、障害からの復旧時に続きから再開できるようにするため。
インフラ
これは、プロジェクター専用の小さなDB(例: Redis, DynamoDB、あるいはリードDB内の専用管理テーブル)として実装されます。



