2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonとイベント駆動:asyncio + Pub/Subモデルの設計実践

Posted at

概要

イベント駆動アーキテクチャは、処理のトリガーをイベントとして明示し、
非同期かつリアクティブに各コンポーネントを連動させるモダンな設計手法である。

Pythonの asyncioPub/Sub(Publish/Subscribe)モデル を組み合わせることで、
柔軟かつ拡張可能な非同期イベント処理基盤を構築できる。

本稿では、小規模なイベントバスからPub/Subモデルの設計、asyncio.Queue を使った実装パターンまで、
イベントドリブンなPython設計の本質を体系的に解説する。


1. イベント駆動の思想とは?

  • 発火(emit)と購読(subscribe)の明確な責務分離
  • イベント処理の非同期化(I/Oブロックを防ぐ)
  • 拡張に強く、双方向依存の排除が可能

2. 最小のPub/Subバス(非同期)

import asyncio
from collections import defaultdict

class EventBus:
    def __init__(self):
        self._subscribers = defaultdict(list)

    def subscribe(self, event_name, callback):
        self._subscribers[event_name].append(callback)

    async def publish(self, event_name, data):
        for callback in self._subscribers[event_name]:
            await callback(data)

3. 使用例:購読・発火

async def on_user_created(data):
    print(f"📬 User Created: {data['username']}")

bus = EventBus()
bus.subscribe("user_created", on_user_created)

await bus.publish("user_created", {"username": "toto"})

→ 複数のリスナーが非同期で イベント発火に反応可能
→ 処理の順番や数に依存せず疎結合に構築できる


4. asyncio.Queue を使ったイベントループ駆動設計

class AsyncEventQueue:
    def __init__(self):
        self.queue = asyncio.Queue()
        self.subscribers = defaultdict(list)

    def subscribe(self, event_type, handler):
        self.subscribers[event_type].append(handler)

    async def publish(self, event_type, payload):
        await self.queue.put((event_type, payload))

    async def run(self):
        while True:
            event_type, payload = await self.queue.get()
            for handler in self.subscribers[event_type]:
                await handler(payload)

✅ 実行と制御

async def handler(data):
    print(f"Handled: {data}")

q = AsyncEventQueue()
q.subscribe("greet", handler)

async def main():
    asyncio.create_task(q.run())
    await q.publish("greet", "こんにちは")

asyncio.run(main())

Queue による バックプレッシャー制御
→ 非同期 Task による リアクティブなイベントドリブン処理が可能


5. 実務ユースケース

✅ マイクロサービス間イベントブローカ

  • RabbitMQ / Redis PubSub を抽象化したラッパーとして設計
  • 各サービスが同一イベント名で独自処理を購読

✅ Webアプリケーションのリアルタイム更新

  • WebSocketの送信イベントを bus.publish() に委譲
  • DBの変更、通知、ログ、APIコールなどが 非同期で並列発火

✅ テスト/デバッグの柔軟化

  • EventBusのMock化で 副作用を制御可能
  • ロガーや検証用Subscriberを容易に差し替え

6. よくある誤用と対策

❌ 複数のbusインスタンスを乱立させる

→ ✅ EventBus は原則として アプリケーションスコープで単一インスタンス


❌ イベント設計が粒度過多

→ ✅ "user.created""payment.failed" のように
意味的にまとまったイベント命名規則を設ける


❌ Handler内部がブロッキング処理を含む

→ ✅ await に対応した 非同期設計に統一すべき
→ 必要に応じて loop.run_in_executor() で外部処理を回避


結語

イベント駆動設計とは、“時間軸で連鎖する意図”を構造化する手段であり、
Pythonは asyncio と簡潔な構文によってそれを直感的に表現できる。

  • イベントベースで責務を分離し、反応型プログラミングを実現
  • 非同期とPub/Subにより、柔軟かつ拡張性の高い設計が可能
  • 通知・トリガー・ロジックを独立させ、スケーラブルなシステムへ進化させる

Pythonicとは、“流れを記述せず、流れを設計する”ことであり、
イベント駆動アーキテクチャはその設計哲学を非同期の力で支える構造である。

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?