4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonで超簡単にPub/Subを実装してみる

4
Last updated at Posted at 2025-12-08

マイクロサービスやイベント駆動アーキテクチャの勉強中、「Pub/Sub(パブリッシュ/サブスクライブ)」によく出会います。

書籍を通して、Pub/Subの概念は理解しているつもりですが、実装したことが無かったので、Pythonを使って超簡単に実装してみました。

Pub/Sub振り返り

  • 購読者(Subscriber
    特定の種類のメッセージを「購読」し、発行されたときに自動的に通知を受け取る側(例:subscribeメソッドでハンドラを登録)
  • 発行者(Publisher
    メッセージやイベントを「発行」する側(例:publishメソッドでイベントを送る)
  • メリット: 疎結合
    発行者と購読者は直接依存せず、仲介役(バスやブローカー)を通じてやりとりするため、システムの拡張や保守がしやすい

作ったもの(簡単な在庫管理システム)

  • 「ドメインイベント」と「関数(ハンドラ)」が紐づいている(実体はDict[EventType, List[Function]]
  • ハンドラの中で次に呼び出すハンドラを連鎖できる
from __future__ import annotations

from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any

# MessageBus本体
class MessageBus:
    def __init__(self) -> None:
        self._handlers: defaultdict[type[Any], list[Callable[[Any], None]]] = defaultdict(list)
        self.errors: list[tuple[object, BaseException]] = []

    # イベント型に紐づくハンドラを登録
    def subscribe(self, event_type: type[Any], handler: Callable[[Any], None]) -> None:
        self._handlers[event_type].append(handler)

    # イベントを発行し、紐づくハンドラをすべて呼び出す
    def publish(self, event: object) -> None:
        for h in list(self._handlers.get(type(event), [])):
            try:
                # 渡されたハンドラの呼び出し
                h(event)
            except BaseException as e:
                # エラーはリストに記録して続行
                self.errors.append((event, e))


# ドメインイベント定義
@dataclass(frozen=True)
class OrderPlaced:
    order_id: str
    sku: str
    qty: int

@dataclass(frozen=True)
class StockAllocated:
    order_id: str
    sku: str
    remaining: int


# 動かしてみる
def main() -> None:
    # シンプルな在庫管理の例
    inventory: dict[str, int] = {"ABC-1": 5}
    bus = MessageBus()

    # --- ハンドラ1: 注文が来たときの処理 ---
    def on_order_placed(ev: OrderPlaced) -> None:
        # 1. 在庫チェック
        left = inventory.get(ev.sku, 0)
        if left < ev.qty:
            raise RuntimeError(f"out of stock for {ev.sku}: need {ev.qty}, have {left}")
        # 2. 在庫引き落とし
        inventory[ev.sku] = left - ev.qty
        # 3.「在庫引当できたよ」という新しいイベントを作ってバスに投げる(イベントの連鎖)
        # これにより、ハンドラ2のon_stock_allocated が自動的に呼ばれる
        bus.publish(StockAllocated(order_id=ev.order_id, sku=ev.sku, remaining=inventory[ev.sku]))

    # --- ハンドラ2: 引当完了ログを出す ---
    def on_stock_allocated(ev: StockAllocated) -> None:
        print(f"[allocated] order={ev.order_id} sku={ev.sku} remaining={ev.remaining}")

    # --- イベントとハンドラの紐づけ ---
    # ここでドメインイベントの型と実行する関数を結びつける
    bus.subscribe(OrderPlaced, on_order_placed)
    bus.subscribe(StockAllocated, on_stock_allocated)

    print(f"initial inventory: {inventory}")

    # ケース1: 成功 → StockAllocatedへ連鎖
    bus.publish(OrderPlaced(order_id="o-1", sku="ABC-1", qty=2))
    # ケース2: 失敗: 在庫不足 → errorsに記録
    bus.publish(OrderPlaced(order_id="o-2", sku="ABC-1", qty=10))

    print(f"final   inventory: {inventory}")

    # エラー確認
    print(f"errors: {len(bus.errors)}")
    for ev, err in bus.errors:
        print(f"  - on {type(ev).__name__}: {err}")


if __name__ == "__main__":
    main()

initial inventory: {'ABC-1': 5}
[allocated] order=o-1 sku=ABC-1 remaining=3
final   inventory: {'ABC-1': 3}
errors: 1
  - on OrderPlaced: out of stock for ABC-1: need 10, have 3

やってみて分かったこと

実際に手を動かして、以下の3点がクリアになりました。

  1. 仕組みは単純な「辞書(dict)」
    Pub/Subの正体はDict[EventType, List[Function]]。publish されたらイベントに紐づく関数をfor文で回すだけでかなりシンプルでした。
  2. 「連鎖」が疎結合を生む
    今回のon_order_placed関数は、「次にログを出す」とか「メールを送る」という詳細を知りません。ただStockAllocatedという事実(イベント)を投げ捨てているだけです。
  3. エラーハンドリングが肝
    同期処理の場合、1つのハンドラが落ちて全体が止まると困ります。try-exceptでエラーをキャッチしてリストに溜める実装にするなど、システムの堅牢性にも考慮する必要があります。

さいごに

やはり自分の手で実装してみるのは大事ですね。一気に理解が深まった気がします。

4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?