LoginSignup
2
0

[Go言語]ワーカープールパターン並列処理 メモ

Posted at
  • Go言語におけるワーカープールパターン並列処理についてメモする。

ワーカープールパターンとは

  • 複数のタスクやジョブを効率的に処理するためのデザインパターン。
  • 特定の数のワーカー(ゴルーチンなど)を用意し、これらのワーカーがタスクキューからタスクを取り出して並行処理することで、リソースの使用効率を最適化し、処理速度を向上させる。

概念

  • ワーカー:
    • 同時に実行されるタスクやジョブを処理する単位(スレッドやゴルーチンなど)。
    • ワーカープール内の各ワーカーは、タスクキューからタスクを取得し、そのタスクを処理する。
  • タスク(ジョブ):
    • 処理される作業の単位。データの変換、データベースへの問い合わせ、ファイルの読み書きなどの形をとる。
  • タスクキュー:
    • 処理されるタスクやジョブを保持するキュー。
    • ワーカーはこのキューからタスクを取り出して処理する。
  • ワーカープール:
    • 事前に定義された数のワーカーを持つプール。
    • このプールを通じて、タスクの処理が並行して行われ、リソースの利用効率と処理速度が最適化される。

利点

  • 効率的なリソース利用:ワーカーの数を制御することで、システムリソースの過剰な消費を防ぐ。
  • スケーラビリティ:アプリケーションの負荷に応じて、ワーカープールのサイズを調整することで対応できる。
  • 処理速度の向上:タスクを並行して処理することで、全体の処理時間を短縮できる。
  • 柔軟性:タスクの種類や処理の複雑さに関わらず、一貫した方法でタスクを処理できる。

コード

Go言語でワーカープールを用いて、5つのジョブを3つのワーカーで並行処理し、結果を出力するコード例

main.go

package main

import (
	"fmt"
	"sync"
	"time"
)

// ワーカー関数
// - ワーカーが行うタスクの処理
// - ジョブキュー (jobs) からタスクを受け取り、それを処理して、結果を結果チャネル (results) に送信する
func worker(i int, jobs <-chan int, results chan<- int) {
	for j := range jobs {
		fmt.Println("Worker", i, "Started  Job", j)
		time.Sleep(time.Second)
		fmt.Println("Worker", i, "Finished Job", j)
		results <- j * 2
	}
}

func main() {
	// タスク準備
	// - `numJobs`で指定した数だけのタスクを処理するため、ジョブキュー(`jobs`)と結果を格納するチャネル(`results`)を準備する
	// - タスクの数に合わせてバッファサイズを設定する
	const numJobs = 5
	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)

	// ワーカープール作成
	// - `numWorkers`で指定した数のワーカーを生成する。各ワーカーは`worker`関数を実行するゴルーチンとして起動される
	// - 各ワーカーには一意のID(i)を与え、`jobs`チャネルからタスクを受け取って処理し、その結果を`results`チャネルに送信する
	const numWorkers = 3
	var wg sync.WaitGroup
	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			worker(i, jobs, results)
		}(i)
	}

	// タスク割り当て
	// - タスク(1から numJobs までの数値)を jobs チャネルに送信し、ワーカーに処理させる
	// - 全てのタスクが`jobs`チャネルに送信された後、`close(jobs)`によりチャネルをクローズする※これにより、追加のタスクがないことがワーカーに通知される
	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}
	close(jobs)

	// ワーカー完了待機
	// - `sync.WaitGroup`を使用して、全てのワーカーの処理が完了するのを待ちます。各ワーカーが完了すると`wg.Done()`が呼び出され、全てのワーカーが完了すると待機が解除される
	// - 全てのワーカーが完了し、全てのタスクの処理が終わった後、`results`チャネルをクローズする
	wg.Wait()
	close(results)

	// 結果の出力
	// - `results`チャネルからタスクの処理結果を受け取り、出力する。
	for i := 1; i <= numJobs; i++ {
		<-results
	}
}

処理シーケンス

動作確認

go run .\main.go
Worker 3 Started  Job 1
Worker 2 Started  Job 3
Worker 1 Started  Job 2
Worker 1 Finished Job 2
Worker 1 Started  Job 4
Worker 2 Finished Job 3
Worker 2 Started  Job 5
Worker 3 Finished Job 1
Worker 2 Finished Job 5
Worker 1 Finished Job 4

参考情報

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0