前回の記事の続き。実装編 by Claude
設計
イベントと処理の流れ
[注文サービス]
↓ (order.created)
[トピック: orders.log]
├→ [在庫サービス] → 在庫を減らす
└→ [売上集計サービス] → 売上を集計
イベント構造
{
"event_id": "uuid-xxx",
"timestamp": "2025-01-01T12:00:00",
"event_type": "order.created",
"data": {
"order_id": "order-001",
"product_id": "product-A",
"quantity": 5,
"price": 100.0,
"total": 500.0
}
}
トピック
orders.log(テキストファイル)
{"event_id":"...","event_type":"order.created",...}
{"event_id":"...","event_type":"order.created",...}
{"event_id":"...","event_type":"order.created",...}
実装
Event
import json
import uuid
from datetime import datetime
from typing import Dict, Any
class Event:
"""イベントの基本クラス"""
def __init__(self, event_type: str, data: Dict[str, Any], event_id: str = None):
self.event_id = event_id or str(uuid.uuid4())
self.timestamp = datetime.now().isoformat()
self.event_type = event_type
self.data = data
def to_json(self) -> str:
"""イベントをJSON文字列に変換"""
return json.dumps({
'event_id': self.event_id,
'timestamp': self.timestamp,
'event_type': self.event_type,
'data': self.data
})
@classmethod
def from_json(cls, json_str: str) -> 'Event':
"""JSON文字列からイベントを復元"""
data = json.loads(json_str)
event = cls(
event_type=data['event_type'],
data=data['data'],
event_id=data['event_id']
)
event.timestamp = data['timestamp']
return event
Topic
import os
from typing import List
from threading import Lock
from event import Event
class Topic:
"""
シンプルなトピック実装
テキストファイルにイベントを保存し、イベントは削除されない
"""
def __init__(self, topic_name: str, data_dir: str = "./data"):
self.topic_name = topic_name
self.file_path = os.path.join(data_dir, f"{topic_name}.log")
self.lock = Lock() # 複数のプロデューサーからの書き込みを保護
os.makedirs(data_dir, exist_ok=True)
if not os.path.exists(self.file_path):
open(self.file_path, 'w').close()
def publish(self, event: Event) -> None:
"""トピックにイベントを発行"""
with self.lock:
with open(self.file_path, 'a') as f:
f.write(event.to_json() + '\n')
print(f"📤 Published: {event.event_type} (id: {event.event_id})")
def read_from_offset(self, offset: int = 0) -> List[Event]:
"""指定されたオフセットからイベントを読み取る"""
events = []
with open(self.file_path, 'r') as f:
lines = f.readlines()
for line in lines[offset:]:
line = line.strip()
if line:
events.append(Event.from_json(line))
return events
def get_total_events(self) -> int:
"""トピック内の総イベント数を取得"""
with open(self.file_path, 'r') as f:
return sum(1 for line in f if line.strip())
Producer
from typing import Dict, Any
from event import Event
from topic import Topic
class Producer:
"""プロデューサーの基底クラス"""
def __init__(self, topic: Topic):
self.topic = topic
def produce(self, event_type: str, data: Dict[str, Any]) -> Event:
"""イベントを生成してトピックに発行"""
event = Event(event_type=event_type, data=data)
self.topic.publish(event)
return event
class OrderProducer(Producer):
"""注文サービスのプロデューサー"""
def create_order(self, order_id: str, product_id: str,
quantity: int, price: float) -> Event:
"""注文を作成してイベントを発行"""
return self.produce(
event_type='order.created',
data={
'order_id': order_id,
'product_id': product_id,
'quantity': quantity,
'price': price,
'total': price * quantity
}
)
Consumer
import os
import time
from typing import Set
from abc import ABC, abstractmethod
from event import Event
from topic import Topic
class Consumer(ABC):
"""コンシューマーの基底クラス"""
def __init__(self, consumer_id: str, topic: Topic, offset_dir: str = "./offsets"):
self.consumer_id = consumer_id
self.topic = topic
self.offset_file = os.path.join(offset_dir, f"{consumer_id}.offset")
self.processed_events: Set[str] = set() # 冪等性のため
self.running = False
os.makedirs(offset_dir, exist_ok=True)
self.processed_file = os.path.join(
offset_dir, f"{consumer_id}.processed")
self._load_processed_events()
def _load_processed_events(self) -> None:
"""処理済みイベントIDをロード"""
if os.path.exists(self.processed_file):
with open(self.processed_file, 'r') as f:
self.processed_events = set(line.strip() for line in f)
def _save_processed_event(self, event_id: str) -> None:
"""処理済みイベントIDを保存"""
with open(self.processed_file, 'a') as f:
f.write(event_id + '\n')
def get_current_offset(self) -> int:
"""現在のオフセット(読み取り位置)を取得"""
if os.path.exists(self.offset_file):
with open(self.offset_file, 'r') as f:
return int(f.read().strip())
return 0
def save_offset(self, offset: int) -> None:
"""オフセットを保存"""
with open(self.offset_file, 'w') as f:
f.write(str(offset))
@abstractmethod
def process_event(self, event: Event) -> None:
"""イベントを処理する(サブクラスで実装)"""
pass
def handle_event(self, event: Event) -> None:
"""イベントをハンドリング(冪等性チェック付き)"""
# 冪等性: 既に処理済みのイベントはスキップ
if event.event_id in self.processed_events:
print(
f"⏭️ [{self.consumer_id}] Already processed: {event.event_id}")
return
# イベント処理
self.process_event(event)
# 処理済みとしてマーク
self.processed_events.add(event.event_id)
self._save_processed_event(event.event_id)
def consume_once(self) -> int:
"""トピックから一度だけイベントを読み取って処理"""
offset = self.get_current_offset()
events = self.topic.read_from_offset(offset)
for event in events:
self.handle_event(event)
offset += 1
self.save_offset(offset)
return len(events)
def start(self, poll_interval: float = 1.0) -> None:
"""無限ループでトピックを監視し続ける"""
self.running = True
print(
f"🚀 [{self.consumer_id}] Started from offset {self.get_current_offset()}")
while self.running:
processed = self.consume_once()
if processed == 0:
time.sleep(poll_interval)
def stop(self) -> None:
"""コンシューマーを停止"""
self.running = False
Service
import json
import os
from typing import Dict
from consumer import Consumer
from event import Event
class InventoryConsumer(Consumer):
"""在庫管理サービス"""
def __init__(self, consumer_id: str, topic, db_file: str = "./data/inventory.json"):
super().__init__(consumer_id, topic)
self.db_file = db_file
self.inventory = self._load_inventory()
def _load_inventory(self) -> Dict[str, int]:
if os.path.exists(self.db_file):
with open(self.db_file, 'r') as f:
return json.load(f)
return {}
def _save_inventory(self) -> None:
with open(self.db_file, 'w') as f:
json.dump(self.inventory, f, indent=2)
def process_event(self, event: Event) -> None:
"""注文イベントを処理して在庫を減らす"""
if event.event_type == 'order.created':
product_id = event.data['product_id']
quantity = event.data['quantity']
if product_id not in self.inventory:
self.inventory[product_id] = 1000 # 初期在庫
self.inventory[product_id] -= quantity
self._save_inventory()
print(f"📦 [{self.consumer_id}] Reduced inventory: {product_id} "
f"(-{quantity}, remaining: {self.inventory[product_id]})")
class SalesConsumer(Consumer):
"""売上集計サービス(後から追加されるサービスの例)"""
def __init__(self, consumer_id: str, topic, db_file: str = "./data/sales.json"):
super().__init__(consumer_id, topic)
self.db_file = db_file
self.sales = self._load_sales()
def _load_sales(self) -> Dict:
if os.path.exists(self.db_file):
with open(self.db_file, 'r') as f:
return json.load(f)
return {
'total_revenue': 0.0,
'total_orders': 0,
'products': {}
}
def _save_sales(self) -> None:
with open(self.db_file, 'w') as f:
json.dump(self.sales, f, indent=2)
def process_event(self, event: Event) -> None:
"""注文イベントを処理して売上を集計"""
if event.event_type == 'order.created':
product_id = event.data['product_id']
quantity = event.data['quantity']
total = event.data['total']
self.sales['total_revenue'] += total
self.sales['total_orders'] += 1
if product_id not in self.sales['products']:
self.sales['products'][product_id] = {
'quantity': 0,
'revenue': 0.0
}
self.sales['products'][product_id]['quantity'] += quantity
self.sales['products'][product_id]['revenue'] += total
self._save_sales()
print(f"💰 [{self.consumer_id}] Updated sales: "
f"Total: ${self.sales['total_revenue']:.2f}, "
f"Orders: {self.sales['total_orders']}")
動かしてみる
※コンシューマーは常時トピックを監視しているのが通常だと思いますが、今回のdemoではpoll部分を手動で実行してます。
[プロデューサー] [トピック] [コンシューマー達]
| | |
|-- create_order() --------->| |
| |<------- poll() ---------|(常時監視)
| |------- event ---------->|
| | |-- process()
| | |
シナリオ1:基本的なフロー
注文が作成され、在庫サービスが在庫を減らします。
# トピックとプロデューサーを作成
topic = Topic("orders")
order_producer = OrderProducer(topic)
# 在庫コンシューマーを作成
inventory_consumer = InventoryConsumer("inventory-service", topic)
# 注文を発行
order_producer.create_order("order-001", "product-A", 5, 100.0)
order_producer.create_order("order-002", "product-B", 3, 200.0)
# イベントを処理
inventory_consumer.consume_once()
実行結果
📤 Published: order.created (id: 837117a8-4ec4-4e8b-abf5-b4eb34f506a0)
📤 Published: order.created (id: 64508a1c-de56-49b1-8cf6-a1d2df6a46de)
📦 [inventory-service] Reduced inventory: product-A (-5, remaining: 995)
📦 [inventory-service] Reduced inventory: product-B (-3, remaining: 997)
シナリオ2: 新しいコンシューマーの追加
システム稼働中に売上集計サービスを追加します。過去のイベントを全て再生して集計できます。
# さらに注文を追加
order_producer.create_order("order-003", "product-A", 2, 100.0)
# 新しい売上集計サービスを追加
sales_consumer = SalesConsumer("sales-service", topic)
# 全イベントを処理(過去のイベントを再生)
sales_consumer.consume_once()
実行結果
📤 Published: order.created (id: 05888c35-f0a9-4ef4-9bca-e7c9612657e0)
💰 [sales-service] Updated sales: Total: $500.00, Orders: 1
💰 [sales-service] Updated sales: Total: $1100.00, Orders: 2
💰 [sales-service] Updated sales: Total: $1300.00, Orders: 3
ポイント
- 売上集計サービスは後から追加されたが、過去の全注文データを集計できた
- イベントが削除されないため、いつでも過去のイベントを再生可能
- 既存のサービス(在庫サービス)への影響はゼロ
シナリオ3: クラッシュと復旧
コンシューマーがクラッシュしても、保存されたオフセットから処理を再開できます。
# 新しい注文を処理
order_producer.create_order("order-004", "product-C", 7, 150.0)
sales_consumer.consume_once()
print(f"現在のオフセット: {sales_consumer.get_current_offset()}") # 4
# コンシューマーがクラッシュ
del sales_consumer
# クラッシュ中に新しい注文が来る
order_producer.create_order("order-005", "product-D", 5, 80.0)
# コンシューマーを復旧
sales_consumer_recovered = SalesConsumer("sales-service", topic)
print(f"復旧時のオフセット: {sales_consumer_recovered.get_current_offset()}") # 4
# 未処理のイベントを処理
sales_consumer_recovered.consume_once()
実行結果
📤 Published: order.created (id: 43745b62-f208-4829-80d6-48c3305c8aad)
💰 [sales-service] Updated sales: Total: $2350.00, Orders: 4
現在のオフセット: 4
📤 Published: order.created (id: 6e877417-1827-4530-a3a8-e49b73a4389e)
復旧時のオフセット: 4
💰 [sales-service] Updated sales: Total: $2750.00, Orders: 5
ポイント
- オフセットが保存されているため、中断した位置から再開
- クラッシュ中に発行されたイベントも漏れなく処理される
シナリオ4: 冪等性の確認
同じイベントを複数回処理しても、結果は変わらない。
# 現在の売上を確認
initial_revenue = sales_consumer.sales['total_revenue'] # $1700.00
# オフセットを巻き戻して同じイベントを再処理
current_offset = sales_consumer.get_current_offset()
sales_consumer.save_offset(current_offset - 2) # 2イベント前に戻す
print(f"current offset: {sales_consumer.get_current_offset()}")
sales_consumer.consume_once()
# 売上は変わらない
print(sales_consumer.sales['total_revenue']) # $1700.00(変わらず)
print(f"current offset: {sales_consumer.get_current_offset()}")
実行結果
current offset: 3
⏭️ [sales-service] Already processed: 43745b62-f208-4829-80d6-48c3305c8aad
⏭️ [sales-service] Already processed: 6e877417-1827-4530-a3a8-e49b73a4389e
2750.0
current offset: 5
ポイント
- 処理済みイベントIDを記録しているため、重複処理を防げる
- ネットワークエラーや再試行があっても安全
複数のコンシューマーが自動でイベントに反応する様子
イベント駆動の本質である「イベントが発行されたら自動的にコンシューマーが反応する」という流れを念のため確認
# 2つのコンシューマーを同時に起動
print("🚀 在庫サービスと売上サービスを同時に起動します\n")
inventory_consumer = InventoryConsumer("inventory-service", topic)
inventory_thread = threading.Thread(
target=inventory_consumer.start, args=(0.5,))
inventory_thread.daemon = True
inventory_thread.start()
sales_consumer = SalesConsumer("sales-service", topic)
sales_thread = threading.Thread(target=sales_consumer.start, args=(0.5,))
sales_thread.daemon = True
sales_thread.start()
time.sleep(1)
# 注文を連続で発行
print("📝 注文を連続で発行します...\n")
print("💡 注目: 1つのイベントに対して、複数のサービスが")
print(" それぞれ独立して自動的に反応します!\n")
order_producer.create_order("order-010", "product-X", 5, 300.0)
time.sleep(1.5)
order_producer.create_order("order-011", "product-Y", 8, 250.0)
time.sleep(1.5)
order_producer.create_order("order-012", "product-X", 3, 300.0)
time.sleep(1.5)
inventory_consumer.stop()
sales_consumer.stop()
time.sleep(0.5)
実行結果
🚀 在庫サービスと売上サービスを同時に起動します
🚀 [inventory-service] Started from offset 5
🚀 [sales-service] Started from offset 5
📝 注文を連続で発行します...
💡 注目: 1つのイベントに対して、複数のサービスが
それぞれ独立して自動的に反応します!
📤 Published: order.created (id: 5341f471-3006-44eb-9962-f7be5b8b5327)
📦 [inventory-service] Reduced inventory: product-X (-5, remaining: 995)
💰 [sales-service] Updated sales: Total: $4250.00, Orders: 6
📤 Published: order.created (id: 61261ac5-46ad-41d0-89b3-78ed04c9f77f)
💰 [sales-service] Updated sales: Total: $6250.00, Orders: 7
📦 [inventory-service] Reduced inventory: product-Y (-8, remaining: 992)
📤 Published: order.created (id: 7960b72d-d34a-4ace-8a7d-8d41fb73850d)
💰 [sales-service] Updated sales: Total: $7150.00, Orders: 8
📦 [inventory-service] Reduced inventory: product-X (-3, remaining: 992)
まとめ
イベント駆動アーキテクチャの以下の特徴について実際に動かしてみて確認してみました。
イベントを削除しない → 過去を再生できる
オフセット管理 → 各コンシューマーが独立して進行
冪等性 → 同じイベントを複数回処理しても安全
疎結合 → サービス間が直接依存しない
個人的に思ったこと
- トピックを常に見てるので反映が早い(ストリーミングがリアルタイムってこういうことか)
- 非同期でデータが届いているかどうかわからなくても、(自動復旧するような仕組みがあれば)最終的に期待した結果になるということ
- トピックの複製は簡単そうなので冗長性も確保できそう(ファイルでやってるからそう思うだけかも)
- プロデューサーとコンシューマが非依存だけでなく、(今回の例だと)各コンシューマーも非依存なので、依存間関係あるならフロー設計必要(出荷してから売り上げ立てないといけないのに注文イベントで売り上げ計上してたとか)
- データはキュー、情報はトピックで扱うとしっくりきそう
(Kafkaなど)実際のプロダクトについて覗いてみるかー