今日のゴール
- Master-Workerパターンの概念を理解する
- タスクの分散と結果の集約を学ぶ
- 障害耐性のあるシステム設計を学ぶ
Master-Workerパターンとは
Producer-Consumerの発展形で、Masterが全体を制御します。
役割の違い
| 役割 | 責務 |
|---|---|
| Master | タスク分配、結果集約、進捗監視 |
| Worker | タスク実行、結果報告、ハートビート送信 |
Producer-Consumerとの違い
| 観点 | Producer-Consumer | Master-Worker |
|---|---|---|
| 制御 | 分散 | 集中(Master) |
| 結果 | 必ずしも集約しない | Masterが集約 |
| 監視 | なし | Masterが監視 |
基本的な流れ
核心となるコード
# Master: タスク分配と結果収集
proc master() {.async.} =
# タスクを投入
for i in 1..10:
await tasks.writeAsync(toTuple(strVal("task"), intVal(i)))
# 結果を集約
var total = 0
for i in 1..10:
let result = await results.takeAsync(resultPattern)
total += result[2].intVal.int
# Worker: タスク処理
proc worker(id: int) {.async.} =
while true:
let task = await tasks.tryTakeAsync(taskPattern)
if task.isSome:
let result = process(task.get())
await results.writeAsync(result)
ポイント:
- タスクスペースと結果スペースを分離
- Workerは
tryTakeで非ブロッキング取得 - Masterは
takeで全結果を収集
タスク状態管理
大規模システムでは、タスクの状態をトラッキングすることが重要です。
状態遷移
状態をタプルで表現
# 状態遷移の実装パターン
# Queued → Running
let queued = await ts.tryTakeAsync(pattern("task", taskId, "queued"))
await ts.writeAsync(toTuple("task", taskId, "running", now))
# Running → Completed
discard await ts.tryTakeAsync(pattern("task", taskId, "running"))
await ts.writeAsync(toTuple("task", taskId, "completed", result))
💡 ポイント: 状態更新は「take → write」のパターンで行います。これにより古い状態が残らず、一貫性が保たれます。
ハートビート監視
Workerの障害を検知するには、ハートビート監視が効果的です。
監視の仕組み
核心コード
# Worker: ハートビート送信
proc sendHeartbeat(workerId: int) {.async.} =
let now = getTime().toUnix() * 1000
discard await ts.tryTakeAsync(pattern("heartbeat", workerId, nil))
await ts.writeAsync(toTuple("heartbeat", workerId, now))
# Monitor: タイムアウト検知
proc checkWorkers() {.async.} =
let now = getTime().toUnix() * 1000
let heartbeats = await ts.readAllAsync(heartbeatPattern)
for hb in heartbeats:
if now - hb[2].intVal > TIMEOUT_MS:
echo "Worker ", hb[1].intVal, " is dead!"
障害からの復旧
Workerがクラッシュした場合、タスクを再キューする必要があります。
復旧戦略
| 戦略 | 説明 | ユースケース |
|---|---|---|
| タイムアウト再試行 | 一定時間後に再キュー | 一般的なケース |
| 即時再試行 | 障害検知時に即再キュー | 冪等性保証あり |
| デッドレターキュー | 失敗タスクを別スペースに | エラー分析が必要 |
タイムアウト再試行の実装
proc monitorTasks() {.async.} =
let now = getTime().toUnix() * 1000
let running = await ts.readAllAsync(pattern("task", nil, "running", nil))
for task in running:
let startTime = task[3].intVal
if now - startTime > TASK_TIMEOUT_MS:
# タイムアウト: 再キュー
let taskId = task[1].intVal
discard await ts.tryTakeAsync(pattern("task", taskId, nil, nil))
await ts.writeAsync(toTuple("task", taskId, "queued", now))
💡 注意: 再試行する場合、タスクが**冪等(idempotent)**であることが重要です。同じタスクが2回実行されても問題ないように設計しましょう。
SupervisedSystem
nindaのSupervisedSystemを使うと、監視と復旧を自動化できます。
特徴
| 機能 | 説明 |
|---|---|
| 自動ハートビート | Worker登録だけで監視開始 |
| 自動再起動 | 障害検知時に自動復旧 |
| 再起動制限 | 無限ループを防止 |
| 状態可視化 | Worker状態を一覧取得 |
let sys = newSupervisedSystem(manager, config)
# ワーカー追加(ハートビートは自動送信)
sys.addWorker(SupervisedWorker(
id: "worker-1",
task: workerProc,
restart: rpAlways # 常に再起動
))
await sys.start()
詳しくは Day 22: 障害検知と復旧戦略 で解説します。
設計のベストプラクティス
1. 冪等性の確保
❌ process(task) で副作用が累積
✅ process(task) を何度呼んでも結果が同じ
2. タイムアウトの設定
| パラメータ | 指針 |
|---|---|
| タスクタイムアウト | 最大処理時間の 2-3倍 |
| ハートビート間隔 | タイムアウトの 1/3-1/5 |
| 監視間隔 | ハートビート間隔と同程度 |
3. スケーリング
Workerを追加するだけでスケールアウト可能。Masterの変更は不要。
まとめ
学んだこと
| トピック | ポイント |
|---|---|
| 役割分担 | Masterが制御、Workerが実行 |
| 状態管理 | Queued → Running → Completed |
| 監視 | ハートビートによる死活監視 |
| 復旧 | タイムアウトと再キュー |
| SupervisedSystem | 監視と復旧の自動化 |
いつ使うか
- 並列計算: 大量データの分散処理
- バッチ処理: 定期的なジョブ実行
- スケーラブルシステム: Worker数で性能調整
演習問題
問題14-1: 進捗表示
Masterがリアルタイムで進捗(完了タスク数/全タスク数)を表示するシステムを実装してください。
問題14-2: 優先度付きタスク
タスクに優先度を追加し、高優先度のタスクが先に処理されるようにしてください。
問題14-3: 再試行制限
各タスクの再試行回数を記録し、3回失敗したらデッドレターキューに移動する仕組みを実装してください。
💡 完全な実装例は GitHubリポジトリ の
src/ninda/actor/を参照してください。