0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Ninda 14日目: Master-Worker 〜マスター・ワーカーパターン〜

Last updated at Posted at 2025-12-13

今日のゴール

  • Master-Workerパターンの概念を理解する
  • タスクの分散と結果の集約を学ぶ
  • 障害耐性のあるシステム設計を学ぶ

Master-Workerパターンとは

Producer-Consumerの発展形で、Masterが全体を制御します。

役割の違い

役割 責務
Master タスク分配、結果集約、進捗監視
Worker タスク実行、結果報告、ハートビート送信

Producer-Consumerとの違い

観点 Producer-Consumer Master-Worker
制御 分散 集中(Master)
結果 必ずしも集約しない Masterが集約
監視 なし Masterが監視

基本的な流れ

核心となるコード

# Master: タスク分配と結果収集
proc master() {.async.} =
  # タスクを投入
  for i in 1..10:
    await tasks.writeAsync(toTuple(strVal("task"), intVal(i)))

  # 結果を集約
  var total = 0
  for i in 1..10:
    let result = await results.takeAsync(resultPattern)
    total += result[2].intVal.int

# Worker: タスク処理
proc worker(id: int) {.async.} =
  while true:
    let task = await tasks.tryTakeAsync(taskPattern)
    if task.isSome:
      let result = process(task.get())
      await results.writeAsync(result)

ポイント:

  • タスクスペースと結果スペースを分離
  • WorkerはtryTakeで非ブロッキング取得
  • Masterはtakeで全結果を収集

タスク状態管理

大規模システムでは、タスクの状態をトラッキングすることが重要です。

状態遷移

状態をタプルで表現

# 状態遷移の実装パターン
# Queued → Running
let queued = await ts.tryTakeAsync(pattern("task", taskId, "queued"))
await ts.writeAsync(toTuple("task", taskId, "running", now))

# Running → Completed
discard await ts.tryTakeAsync(pattern("task", taskId, "running"))
await ts.writeAsync(toTuple("task", taskId, "completed", result))

💡 ポイント: 状態更新は「take → write」のパターンで行います。これにより古い状態が残らず、一貫性が保たれます。


ハートビート監視

Workerの障害を検知するには、ハートビート監視が効果的です。

監視の仕組み

核心コード

# Worker: ハートビート送信
proc sendHeartbeat(workerId: int) {.async.} =
  let now = getTime().toUnix() * 1000
  discard await ts.tryTakeAsync(pattern("heartbeat", workerId, nil))
  await ts.writeAsync(toTuple("heartbeat", workerId, now))

# Monitor: タイムアウト検知
proc checkWorkers() {.async.} =
  let now = getTime().toUnix() * 1000
  let heartbeats = await ts.readAllAsync(heartbeatPattern)
  for hb in heartbeats:
    if now - hb[2].intVal > TIMEOUT_MS:
      echo "Worker ", hb[1].intVal, " is dead!"

障害からの復旧

Workerがクラッシュした場合、タスクを再キューする必要があります。

復旧戦略

戦略 説明 ユースケース
タイムアウト再試行 一定時間後に再キュー 一般的なケース
即時再試行 障害検知時に即再キュー 冪等性保証あり
デッドレターキュー 失敗タスクを別スペースに エラー分析が必要

タイムアウト再試行の実装

proc monitorTasks() {.async.} =
  let now = getTime().toUnix() * 1000
  let running = await ts.readAllAsync(pattern("task", nil, "running", nil))

  for task in running:
    let startTime = task[3].intVal
    if now - startTime > TASK_TIMEOUT_MS:
      # タイムアウト: 再キュー
      let taskId = task[1].intVal
      discard await ts.tryTakeAsync(pattern("task", taskId, nil, nil))
      await ts.writeAsync(toTuple("task", taskId, "queued", now))

💡 注意: 再試行する場合、タスクが**冪等(idempotent)**であることが重要です。同じタスクが2回実行されても問題ないように設計しましょう。


SupervisedSystem

nindaのSupervisedSystemを使うと、監視と復旧を自動化できます。

特徴

機能 説明
自動ハートビート Worker登録だけで監視開始
自動再起動 障害検知時に自動復旧
再起動制限 無限ループを防止
状態可視化 Worker状態を一覧取得
let sys = newSupervisedSystem(manager, config)

# ワーカー追加(ハートビートは自動送信)
sys.addWorker(SupervisedWorker(
  id: "worker-1",
  task: workerProc,
  restart: rpAlways  # 常に再起動
))

await sys.start()

詳しくは Day 22: 障害検知と復旧戦略 で解説します。


設計のベストプラクティス

1. 冪等性の確保

❌ process(task) で副作用が累積
✅ process(task) を何度呼んでも結果が同じ

2. タイムアウトの設定

パラメータ 指針
タスクタイムアウト 最大処理時間の 2-3倍
ハートビート間隔 タイムアウトの 1/3-1/5
監視間隔 ハートビート間隔と同程度

3. スケーリング

Workerを追加するだけでスケールアウト可能。Masterの変更は不要。


まとめ

学んだこと

トピック ポイント
役割分担 Masterが制御、Workerが実行
状態管理 Queued → Running → Completed
監視 ハートビートによる死活監視
復旧 タイムアウトと再キュー
SupervisedSystem 監視と復旧の自動化

いつ使うか

  • 並列計算: 大量データの分散処理
  • バッチ処理: 定期的なジョブ実行
  • スケーラブルシステム: Worker数で性能調整

演習問題

問題14-1: 進捗表示

Masterがリアルタイムで進捗(完了タスク数/全タスク数)を表示するシステムを実装してください。

問題14-2: 優先度付きタスク

タスクに優先度を追加し、高優先度のタスクが先に処理されるようにしてください。

問題14-3: 再試行制限

各タスクの再試行回数を記録し、3回失敗したらデッドレターキューに移動する仕組みを実装してください。

💡 完全な実装例は GitHubリポジトリsrc/ninda/actor/ を参照してください。


前回: Producer-Consumer | 目次 | 次回: Blackboard

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?