マイクロサービスやイベント駆動アーキテクチャの勉強中、「Pub/Sub(パブリッシュ/サブスクライブ)」によく出会います。
書籍を通して、Pub/Subの概念は理解しているつもりですが、実装したことが無かったので、Pythonを使って超簡単に実装してみました。
Pub/Sub振り返り
- 購読者(Subscriber)
特定の種類のメッセージを「購読」し、発行されたときに自動的に通知を受け取る側(例:subscribeメソッドでハンドラを登録) - 発行者(Publisher)
メッセージやイベントを「発行」する側(例:publishメソッドでイベントを送る) - メリット: 疎結合
発行者と購読者は直接依存せず、仲介役(バスやブローカー)を通じてやりとりするため、システムの拡張や保守がしやすい
作ったもの(簡単な在庫管理システム)
- 「ドメインイベント」と「関数(ハンドラ)」が紐づいている(実体は
Dict[EventType, List[Function]]) - ハンドラの中で次に呼び出すハンドラを連鎖できる
from __future__ import annotations
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
# MessageBus本体
class MessageBus:
def __init__(self) -> None:
self._handlers: defaultdict[type[Any], list[Callable[[Any], None]]] = defaultdict(list)
self.errors: list[tuple[object, BaseException]] = []
# イベント型に紐づくハンドラを登録
def subscribe(self, event_type: type[Any], handler: Callable[[Any], None]) -> None:
self._handlers[event_type].append(handler)
# イベントを発行し、紐づくハンドラをすべて呼び出す
def publish(self, event: object) -> None:
for h in list(self._handlers.get(type(event), [])):
try:
# 渡されたハンドラの呼び出し
h(event)
except BaseException as e:
# エラーはリストに記録して続行
self.errors.append((event, e))
# ドメインイベント定義
@dataclass(frozen=True)
class OrderPlaced:
order_id: str
sku: str
qty: int
@dataclass(frozen=True)
class StockAllocated:
order_id: str
sku: str
remaining: int
# 動かしてみる
def main() -> None:
# シンプルな在庫管理の例
inventory: dict[str, int] = {"ABC-1": 5}
bus = MessageBus()
# --- ハンドラ1: 注文が来たときの処理 ---
def on_order_placed(ev: OrderPlaced) -> None:
# 1. 在庫チェック
left = inventory.get(ev.sku, 0)
if left < ev.qty:
raise RuntimeError(f"out of stock for {ev.sku}: need {ev.qty}, have {left}")
# 2. 在庫引き落とし
inventory[ev.sku] = left - ev.qty
# 3.「在庫引当できたよ」という新しいイベントを作ってバスに投げる(イベントの連鎖)
# これにより、ハンドラ2のon_stock_allocated が自動的に呼ばれる
bus.publish(StockAllocated(order_id=ev.order_id, sku=ev.sku, remaining=inventory[ev.sku]))
# --- ハンドラ2: 引当完了ログを出す ---
def on_stock_allocated(ev: StockAllocated) -> None:
print(f"[allocated] order={ev.order_id} sku={ev.sku} remaining={ev.remaining}")
# --- イベントとハンドラの紐づけ ---
# ここでドメインイベントの型と実行する関数を結びつける
bus.subscribe(OrderPlaced, on_order_placed)
bus.subscribe(StockAllocated, on_stock_allocated)
print(f"initial inventory: {inventory}")
# ケース1: 成功 → StockAllocatedへ連鎖
bus.publish(OrderPlaced(order_id="o-1", sku="ABC-1", qty=2))
# ケース2: 失敗: 在庫不足 → errorsに記録
bus.publish(OrderPlaced(order_id="o-2", sku="ABC-1", qty=10))
print(f"final inventory: {inventory}")
# エラー確認
print(f"errors: {len(bus.errors)}")
for ev, err in bus.errors:
print(f" - on {type(ev).__name__}: {err}")
if __name__ == "__main__":
main()
initial inventory: {'ABC-1': 5}
[allocated] order=o-1 sku=ABC-1 remaining=3
final inventory: {'ABC-1': 3}
errors: 1
- on OrderPlaced: out of stock for ABC-1: need 10, have 3
やってみて分かったこと
実際に手を動かして、以下の3点がクリアになりました。
-
仕組みは単純な「辞書(dict)」
Pub/Subの正体はDict[EventType, List[Function]]。publish されたらイベントに紐づく関数をfor文で回すだけでかなりシンプルでした。 - 「連鎖」が疎結合を生む
今回のon_order_placed関数は、「次にログを出す」とか「メールを送る」という詳細を知りません。ただStockAllocatedという事実(イベント)を投げ捨てているだけです。 - エラーハンドリングが肝
同期処理の場合、1つのハンドラが落ちて全体が止まると困ります。try-exceptでエラーをキャッチしてリストに溜める実装にするなど、システムの堅牢性にも考慮する必要があります。
さいごに
やはり自分の手で実装してみるのは大事ですね。一気に理解が深まった気がします。