- Go言語におけるワーカープールパターン並列処理についてメモする。
ワーカープールパターンとは
- 複数のタスクやジョブを効率的に処理するためのデザインパターン。
- 特定の数のワーカー(ゴルーチンなど)を用意し、これらのワーカーがタスクキューからタスクを取り出して並行処理することで、リソースの使用効率を最適化し、処理速度を向上させる。
概念
-
ワーカー:
- 同時に実行されるタスクやジョブを処理する単位(スレッドやゴルーチンなど)。
- ワーカープール内の各ワーカーは、タスクキューからタスクを取得し、そのタスクを処理する。
-
タスク(ジョブ):
- 処理される作業の単位。データの変換、データベースへの問い合わせ、ファイルの読み書きなどの形をとる。
-
タスクキュー:
- 処理されるタスクやジョブを保持するキュー。
- ワーカーはこのキューからタスクを取り出して処理する。
-
ワーカープール:
- 事前に定義された数のワーカーを持つプール。
- このプールを通じて、タスクの処理が並行して行われ、リソースの利用効率と処理速度が最適化される。
利点
- 効率的なリソース利用:ワーカーの数を制御することで、システムリソースの過剰な消費を防ぐ。
- スケーラビリティ:アプリケーションの負荷に応じて、ワーカープールのサイズを調整することで対応できる。
- 処理速度の向上:タスクを並行して処理することで、全体の処理時間を短縮できる。
- 柔軟性:タスクの種類や処理の複雑さに関わらず、一貫した方法でタスクを処理できる。
コード
Go言語でワーカープールを用いて、5つのジョブを3つのワーカーで並行処理し、結果を出力するコード例
main.go
package main
import (
"fmt"
"sync"
"time"
)
// ワーカー関数
// - ワーカーが行うタスクの処理
// - ジョブキュー (jobs) からタスクを受け取り、それを処理して、結果を結果チャネル (results) に送信する
func worker(i int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("Worker", i, "Started Job", j)
time.Sleep(time.Second)
fmt.Println("Worker", i, "Finished Job", j)
results <- j * 2
}
}
func main() {
// タスク準備
// - `numJobs`で指定した数だけのタスクを処理するため、ジョブキュー(`jobs`)と結果を格納するチャネル(`results`)を準備する
// - タスクの数に合わせてバッファサイズを設定する
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// ワーカープール作成
// - `numWorkers`で指定した数のワーカーを生成する。各ワーカーは`worker`関数を実行するゴルーチンとして起動される
// - 各ワーカーには一意のID(i)を与え、`jobs`チャネルからタスクを受け取って処理し、その結果を`results`チャネルに送信する
const numWorkers = 3
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
worker(i, jobs, results)
}(i)
}
// タスク割り当て
// - タスク(1から numJobs までの数値)を jobs チャネルに送信し、ワーカーに処理させる
// - 全てのタスクが`jobs`チャネルに送信された後、`close(jobs)`によりチャネルをクローズする※これにより、追加のタスクがないことがワーカーに通知される
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// ワーカー完了待機
// - `sync.WaitGroup`を使用して、全てのワーカーの処理が完了するのを待ちます。各ワーカーが完了すると`wg.Done()`が呼び出され、全てのワーカーが完了すると待機が解除される
// - 全てのワーカーが完了し、全てのタスクの処理が終わった後、`results`チャネルをクローズする
wg.Wait()
close(results)
// 結果の出力
// - `results`チャネルからタスクの処理結果を受け取り、出力する。
for i := 1; i <= numJobs; i++ {
<-results
}
}
処理シーケンス
動作確認
go run .\main.go
Worker 3 Started Job 1
Worker 2 Started Job 3
Worker 1 Started Job 2
Worker 1 Finished Job 2
Worker 1 Started Job 4
Worker 2 Finished Job 3
Worker 2 Started Job 5
Worker 3 Finished Job 1
Worker 2 Finished Job 5
Worker 1 Finished Job 4