今日のゴール
- Producer-Consumerパターンの概念を理解する
- タプルスペースでの自然な実装方法を学ぶ
- スケーラブルなワーカーシステムを構築する
Producer-Consumerパターンとは
最も基本的な並行処理パターンの一つです。
| 役割 | 責務 |
|---|---|
| Producer | データを生成してバッファに追加 |
| Buffer | データを一時的に保持(タプルスペース) |
| Consumer | バッファからデータを取り出して処理 |
なぜタプルスペースが最適か
| 特性 | メリット |
|---|---|
| 疎結合 | Producer/Consumerは互いを知らない |
| 非同期 | 速度差を自然に吸収 |
| スケール | Consumer追加で即負荷分散 |
基本実装
# Producer
proc producer() {.async.} =
for i in 1..10:
await ts.writeAsync(toTuple(strVal("job"), intVal(i)))
# Consumer
proc consumer() {.async.} =
while true:
let job = await ts.tryTakeAsync(jobPattern)
if job.isSome:
process(job.get())
ポイント:
-
write: 非ブロッキング、即座に完了 -
tryTake: 非ブロッキング、なければNone -
take: ブロッキング、あるまで待機
複数Consumer(負荷分散)
タプルスペースの強みは、Consumer追加だけで自動負荷分散できること。
# 3つのConsumerを並行起動
asyncCheck consumer("Worker-1")
asyncCheck consumer("Worker-2")
asyncCheck consumer("Worker-3")
動作イメージ
Producer: Task 1, 2, 3, 4, 5...
Worker-1: Task 1 → Task 4 → ...
Worker-2: Task 2 → Task 5 → ...
Worker-3: Task 3 → Task 6 → ...
💡 重要:
takeの排他性により、各タスクは1つのConsumerだけが処理します。重複処理の心配なし。
複数Producer
Producerも複数並行可能です。
# 複数ソースからのイベント
asyncCheck producer("SensorA", 5)
asyncCheck producer("SensorB", 5)
asyncCheck producer("SensorC", 5)
優先度付きキュー
パターンマッチングを使って優先度を実現できます。
タプル設計
# (タイプ, タスクID, 優先度)
await ts.writeAsync(toTuple(strVal("task"), intVal(1), intVal(1))) # 高優先度
await ts.writeAsync(toTuple(strVal("task"), intVal(2), intVal(3))) # 低優先度
優先度順に処理
# 優先度1 → 2 → 3 の順で処理
for priority in 1..3:
while true:
let task = await ts.tryTakeAsync(
toPattern(strVal("task"), nilValue(), intVal(priority))
)
if task.isNone: break
process(task.get())
完了通知パターン
処理完了をProducerに通知するパターンです。
# Producer: 結果を待機
let result = await results.takeAsync(resultPattern)
# Consumer: 結果を書き込み
await results.writeAsync(toTuple(strVal("result"), jobId, strVal("success")))
パイプライン処理
Consumer兼Producerとして、処理を連鎖できます。
# サムネイル生成(Consumer兼Producer)
proc thumbnailGenerator() {.async.} =
let image = await uploads.takeAsync(imagePattern)
# 処理...
await thumbnails.writeAsync(thumbnail)
ポイント:
- 各ステージは独立してスケール可能
- ボトルネックのステージだけWorker追加
設計のベストプラクティス
1. タプル設計
# ❌ 曖昧
toTuple(strVal("data"), intVal(1))
# ✅ 明確
toTuple(strVal("job"), strVal("image-resize"), intVal(jobId), strVal("pending"))
2. 終了条件
# 方法1: 終了タプル
await ts.writeAsync(toTuple(strVal("job"), strVal("DONE")))
# 方法2: カウント
var processed = 0
while processed < expectedCount:
# 処理...
processed.inc
3. エラーハンドリング
try:
process(job)
await results.writeAsync(successResult)
except:
await results.writeAsync(errorResult)
まとめ
学んだこと
| トピック | ポイント |
|---|---|
| 基本構造 | Producer → TupleSpace → Consumer |
| 負荷分散 | Consumer追加だけで自動分散 |
| 優先度 | パターンマッチングで優先度キュー |
| 完了通知 | 別スペースで結果を返す |
| パイプライン | Consumer兼Producerで連鎖 |
いつ使うか
- タスクキュー: バックグラウンドジョブ
- イベント処理: ログ、メトリクス収集
- バッチ処理: 大量データの並列処理
演習問題
問題13-1: 負荷分散の確認
5つのConsumerを起動し、100個のタスクを処理させてください。各Consumerが処理したタスク数を表示して、負荷が分散されていることを確認しましょう。
問題13-2: デッドレターキュー
処理に失敗したタスクを別のスペース(デッドレターキュー)に移動する仕組みを実装してください。
問題13-3: レート制限
Producerが1秒間に10タスクまでしか投入できないようにレート制限を実装してください。
💡 完全な実装例は GitHubリポジトリ を参照してください。