0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Ninda 13日目: Producer-Consumer 〜生産者・消費者パターン〜

Last updated at Posted at 2025-12-12

今日のゴール

  • Producer-Consumerパターンの概念を理解する
  • タプルスペースでの自然な実装方法を学ぶ
  • スケーラブルなワーカーシステムを構築する

Producer-Consumerパターンとは

最も基本的な並行処理パターンの一つです。

役割 責務
Producer データを生成してバッファに追加
Buffer データを一時的に保持(タプルスペース)
Consumer バッファからデータを取り出して処理

なぜタプルスペースが最適か

特性 メリット
疎結合 Producer/Consumerは互いを知らない
非同期 速度差を自然に吸収
スケール Consumer追加で即負荷分散

基本実装

# Producer
proc producer() {.async.} =
  for i in 1..10:
    await ts.writeAsync(toTuple(strVal("job"), intVal(i)))

# Consumer
proc consumer() {.async.} =
  while true:
    let job = await ts.tryTakeAsync(jobPattern)
    if job.isSome:
      process(job.get())

ポイント:

  • write: 非ブロッキング、即座に完了
  • tryTake: 非ブロッキング、なければNone
  • take: ブロッキング、あるまで待機

複数Consumer(負荷分散)

タプルスペースの強みは、Consumer追加だけで自動負荷分散できること。

# 3つのConsumerを並行起動
asyncCheck consumer("Worker-1")
asyncCheck consumer("Worker-2")
asyncCheck consumer("Worker-3")

動作イメージ

Producer: Task 1, 2, 3, 4, 5...
Worker-1: Task 1 → Task 4 → ...
Worker-2: Task 2 → Task 5 → ...
Worker-3: Task 3 → Task 6 → ...

💡 重要: takeの排他性により、各タスクは1つのConsumerだけが処理します。重複処理の心配なし。


複数Producer

Producerも複数並行可能です。

# 複数ソースからのイベント
asyncCheck producer("SensorA", 5)
asyncCheck producer("SensorB", 5)
asyncCheck producer("SensorC", 5)

優先度付きキュー

パターンマッチングを使って優先度を実現できます。

タプル設計

# (タイプ, タスクID, 優先度)
await ts.writeAsync(toTuple(strVal("task"), intVal(1), intVal(1)))  # 高優先度
await ts.writeAsync(toTuple(strVal("task"), intVal(2), intVal(3)))  # 低優先度

優先度順に処理

# 優先度1 → 2 → 3 の順で処理
for priority in 1..3:
  while true:
    let task = await ts.tryTakeAsync(
      toPattern(strVal("task"), nilValue(), intVal(priority))
    )
    if task.isNone: break
    process(task.get())

完了通知パターン

処理完了をProducerに通知するパターンです。

# Producer: 結果を待機
let result = await results.takeAsync(resultPattern)

# Consumer: 結果を書き込み
await results.writeAsync(toTuple(strVal("result"), jobId, strVal("success")))

パイプライン処理

Consumer兼Producerとして、処理を連鎖できます。

# サムネイル生成(Consumer兼Producer)
proc thumbnailGenerator() {.async.} =
  let image = await uploads.takeAsync(imagePattern)
  # 処理...
  await thumbnails.writeAsync(thumbnail)

ポイント:

  • 各ステージは独立してスケール可能
  • ボトルネックのステージだけWorker追加

設計のベストプラクティス

1. タプル設計

# ❌ 曖昧
toTuple(strVal("data"), intVal(1))

# ✅ 明確
toTuple(strVal("job"), strVal("image-resize"), intVal(jobId), strVal("pending"))

2. 終了条件

# 方法1: 終了タプル
await ts.writeAsync(toTuple(strVal("job"), strVal("DONE")))

# 方法2: カウント
var processed = 0
while processed < expectedCount:
  # 処理...
  processed.inc

3. エラーハンドリング

try:
  process(job)
  await results.writeAsync(successResult)
except:
  await results.writeAsync(errorResult)

まとめ

学んだこと

トピック ポイント
基本構造 Producer → TupleSpace → Consumer
負荷分散 Consumer追加だけで自動分散
優先度 パターンマッチングで優先度キュー
完了通知 別スペースで結果を返す
パイプライン Consumer兼Producerで連鎖

いつ使うか

  • タスクキュー: バックグラウンドジョブ
  • イベント処理: ログ、メトリクス収集
  • バッチ処理: 大量データの並列処理

演習問題

問題13-1: 負荷分散の確認

5つのConsumerを起動し、100個のタスクを処理させてください。各Consumerが処理したタスク数を表示して、負荷が分散されていることを確認しましょう。

問題13-2: デッドレターキュー

処理に失敗したタスクを別のスペース(デッドレターキュー)に移動する仕組みを実装してください。

問題13-3: レート制限

Producerが1秒間に10タスクまでしか投入できないようにレート制限を実装してください。

💡 完全な実装例は GitHubリポジトリ を参照してください。


前回: 型安全API | 目次 | 次回: Master-Worker

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?