概要
イベント駆動アーキテクチャは、処理のトリガーをイベントとして明示し、
非同期かつリアクティブに各コンポーネントを連動させるモダンな設計手法である。
Pythonの asyncio
と Pub/Sub(Publish/Subscribe)モデル
を組み合わせることで、
柔軟かつ拡張可能な非同期イベント処理基盤を構築できる。
本稿では、小規模なイベントバスからPub/Subモデルの設計、asyncio.Queue
を使った実装パターンまで、
イベントドリブンなPython設計の本質を体系的に解説する。
1. イベント駆動の思想とは?
- 発火(emit)と購読(subscribe)の明確な責務分離
- イベント処理の非同期化(I/Oブロックを防ぐ)
- 拡張に強く、双方向依存の排除が可能
2. 最小のPub/Subバス(非同期)
import asyncio
from collections import defaultdict
class EventBus:
def __init__(self):
self._subscribers = defaultdict(list)
def subscribe(self, event_name, callback):
self._subscribers[event_name].append(callback)
async def publish(self, event_name, data):
for callback in self._subscribers[event_name]:
await callback(data)
3. 使用例:購読・発火
async def on_user_created(data):
print(f"📬 User Created: {data['username']}")
bus = EventBus()
bus.subscribe("user_created", on_user_created)
await bus.publish("user_created", {"username": "toto"})
→ 複数のリスナーが非同期で イベント発火に反応可能
→ 処理の順番や数に依存せず疎結合に構築できる
4. asyncio.Queue を使ったイベントループ駆動設計
class AsyncEventQueue:
def __init__(self):
self.queue = asyncio.Queue()
self.subscribers = defaultdict(list)
def subscribe(self, event_type, handler):
self.subscribers[event_type].append(handler)
async def publish(self, event_type, payload):
await self.queue.put((event_type, payload))
async def run(self):
while True:
event_type, payload = await self.queue.get()
for handler in self.subscribers[event_type]:
await handler(payload)
✅ 実行と制御
async def handler(data):
print(f"Handled: {data}")
q = AsyncEventQueue()
q.subscribe("greet", handler)
async def main():
asyncio.create_task(q.run())
await q.publish("greet", "こんにちは")
asyncio.run(main())
→ Queue
による バックプレッシャー制御と
→ 非同期 Task
による リアクティブなイベントドリブン処理が可能
5. 実務ユースケース
✅ マイクロサービス間イベントブローカ
- RabbitMQ / Redis PubSub を抽象化したラッパーとして設計
- 各サービスが同一イベント名で独自処理を購読
✅ Webアプリケーションのリアルタイム更新
- WebSocketの送信イベントを
bus.publish()
に委譲 - DBの変更、通知、ログ、APIコールなどが 非同期で並列発火
✅ テスト/デバッグの柔軟化
- EventBusのMock化で 副作用を制御可能
- ロガーや検証用Subscriberを容易に差し替え
6. よくある誤用と対策
❌ 複数のbusインスタンスを乱立させる
→ ✅ EventBus
は原則として アプリケーションスコープで単一インスタンス
❌ イベント設計が粒度過多
→ ✅ "user.created"
や "payment.failed"
のように
→ 意味的にまとまったイベント命名規則を設ける
❌ Handler内部がブロッキング処理を含む
→ ✅ await
に対応した 非同期設計に統一すべき
→ 必要に応じて loop.run_in_executor()
で外部処理を回避
結語
イベント駆動設計とは、“時間軸で連鎖する意図”を構造化する手段であり、
Pythonは asyncio
と簡潔な構文によってそれを直感的に表現できる。
- イベントベースで責務を分離し、反応型プログラミングを実現
- 非同期とPub/Subにより、柔軟かつ拡張性の高い設計が可能
- 通知・トリガー・ロジックを独立させ、スケーラブルなシステムへ進化させる
Pythonicとは、“流れを記述せず、流れを設計する”ことであり、
イベント駆動アーキテクチャはその設計哲学を非同期の力で支える構造である。