こちらの記事は、Joseph Livni氏により2018年10月に公開された『 Write a Go Worker Pool in 15 minutes 』の和訳です。
本記事は原著者から許可を得た上で記事を公開しています。
私は多くのユーザーリクエストを高速に処理するGoのサービスを構築していました。Goroutineのプールを使ってカプセル化することにより、パフォーマンスは20倍になりました。本記事では独自のワーカープールを作る方法を説明します。1
最終的な結果を知りたい場合は こちら からダウンロードしてください。2
本記事は以下の順番で説明します。
- モックとなるジョブの作成
- ワーカープールの作成
- ベンチマークテストの実施
ファイル構造は以下です。
/go_worker_pool
/work
work.go
/pool
worker.go
dispatcher.go
bench_test.go
main.go
上記を構築するディレクトリパスは以下のとおりです。
go/src/github.com/Lebonesco/go_worker_pool
最終的には以下のようになります。
$ go run main.go
2018/10/06 15:53:43 starting application...
2018/10/06 15:53:43 starting worker: 1
2018/10/06 15:53:43 starting worker: 2
2018/10/06 15:53:43 starting worker: 3
2018/10/06 15:53:43 starting worker: 4
2018/10/06 15:53:43 starting worker: 5
2018/10/06 15:53:43 creating jobs...
worker [2] - created hash [2376065843] from word [iCMRAjWw]
worker [4] - created hash [121297580] from word [xhxKQFDa]
worker [1] - created hash [3193224551] from word [XVlBzgba]
worker [3] - created hash [1481401259] from word [hTHctcuA]
worker [5] - created hash [166906897] from word [FpLSjFbc]
worker [5] - created hash [1752784812] from word [QYhYzRyW]
...
モックとなるジョブの作成
完了するまでに時間がかかる処理をシミュレートするために、ランダムな文字列を使って、たくさんのジョブを作成します。ジョブは何らかの処理を実行します。今回は文字列のハッシュを生成します。
package job
import (
"fmt"
"hash/fnv"
"time"
"math/rand"
"os"
)
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
// create random string
func RandStringRunes(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
// create list of jobs
func CreateJobs(amount int) []string {
var jobs []string
for i := 0; i < amount; i++ {
jobs = append(jobs, RandStringRunes(8))
}
return jobs
}
// mimics any type of job that can be run concurrently
func DoWork(word string, id int) {
h := fnv.New32a()
h.Write([]byte(word))
time.Sleep(time.Second)
if os.Getenv("DEBUG") == "true" {
fmt.Printf("worker [%d] - created hash [%d] from word [%s]\n", id, h.Sum32(), word)
}
}
DoWork() は string と worker id を受け取って、文字列のハッシュを計算します。結果を出力する前に1秒間sleepします。ジョブの終了時に結果を表示するには、以下をセットします。
$ export DEBUG="true"
後ほどベンチマークテストをするときには、ハッシュの結果を表示することは不要であることと、テスト結果が見にくくなることから、制御できるようにしています。
ワーカープールの作成
次は、実際にワーカープールを作成していきましょう。ワーカープールは3つの要素で構成されます。dispatcher (ワーカーをインスタンス化し、ワーカーとワーカープールを接続します)と workers (処理を待っているジョブを受け取り、処理をします)と collector (ジョブの到着を待って、workers に割り当てます)です。
以下のコードで注意すべき重要な内容は WorkerChannel と Worker構造体の Start() メソッドです。
Goでよく知られているフレーズに “Do not communicate by sharing memory; instead, share memory by communicating.” 3 があります。これが WorkerChannel の目的であり、ワーカーとワーカープールを通信する方法です。WorkerChannel は利用できるすべてのワーカーのチャネルを保持します。ジョブが到着すると collector は WorkerChannel からワーカーチャネルを取得し、ジョブをそのチャネルに渡します。ワーカーがそのジョブを受け取ります。逆に、ワーカーは処理が終了するとそのチャネルを WorkerChannel に戻し、次のジョブを待ちます。
package pool
import (
"log"
work "github.com/Lebonesco/go_worker_pool/work"
)
type Work struct {
ID int
Job string
}
type Worker struct {
ID int
WorkerChannel chan chan Work // used to communicate between dispatcher and workers
Channel chan Work
End chan bool
}
// start worker
func (w *Worker) Start() {
go func() {
for {
w.WorkerChannel <-w.Channel // when the worker is available place channel in queue
select {
case job := <-w.Channel: // worker has received job
work.DoWork(job.Job, w.ID) // do work
case <-w.End:
return
}
}
}()
}
// end worker
func (w *Worker) Stop() {
log.Printf("worker [%d] is stopping", w.ID)
w.End <- true
}
すばらしい!終わりに近づいています。ワーカープールを仕上げるために dispatcher と collector を完成させましょう。その前に、要素を組み立てるという点では、ワーカープールを設計する方法はたくさんあることを知っておいてください。
コードの可読性や実装方法に応じて、よりうまくいく方法があります。workers, dispatcher, collector の要素を用いて、あなたのプロジェクトに応用できるでしょう。たとえば collector を dispatcher から分ける人もいます。私の場合 dispatcher から Collector 構造体を返します。独自のワーカープールを実装したい人にとって可読性が良いと考えているためです。4
package pool
import (
"log"
)
var WorkerChannel = make(chan chan Work)
type Collector struct {
Work chan Work // receives jobs to send to workers
End chan bool // when receives bool stops workers
}
func StartDispatcher(workerCount int) Collector {
var i int
var workers []Worker
input := make(chan Work) // channel to recieve work
end := make(chan bool) // channel to spin down workers
collector := Collector{Work: input, End: end}
for i < workerCount {
i++
log.Println("starting worker: ", i)
worker := Worker{
ID: i,
Channel: make(chan Work),
WorkerChannel: WorkerChannel,
End: make(chan bool)}
worker.Start()
workers = append(workers, worker) // stores worker
}
// start collector
go func() {
for {
select {
case <-end:
for _, w := range workers {
w.Stop() // stop worker
}
return
case work := <-input:
worker := <-WorkerChannel // wait for available channel
worker <-work // dispatch work to worker
}
}
}()
return collector
}
最後に、アプリケーションを動かすドライバーを作りましょう。main() 関数がワーカープールをインスタンス化し、ジョブを作成します。
package main
import (
"log"
"github.com/Lebonesco/go_worker_pool/pool"
work "github.com/Lebonesco/go_worker_pool/work"
)
const WORKER_COUNT = 5
const JOB_COUNT = 100
func main() {
log.Println("starting application...")
collector := pool.StartDispatcher(WORKER_COUNT) // start up worker pool
for i, job := range work.CreateJobs(JOB_COUNT) {
collector.Work <-pool.Work{Job: job, ID: i}
}
}
ベンチマークテストの実施
このワーカープールが実際に機能することを確認するために、簡単なベンチマークテストを動かしてみましょう。Goはテストフレームワークが組み込まれているため、とても簡単にできます。
package main
import (
"testing"
"github.com/Lebonesco/go_worker_pool/pool"
work "github.com/Lebonesco/go_worker_pool/work"
)
func BenchmarkConcurrent(b *testing.B) {
collector := pool.StartDispatcher(WORKER_COUNT) // start up worker pool
for n := 0; n < b.N; n++ {
for i, job := range work.CreateJobs(20) {
collector.Work <-pool.Work{Job: job, ID: i}
}
}
}
func BenchmarkNonconcurrent(b *testing.B) {
for n := 0; n < b.N; n++ {
for _, job := range work.CreateJobs(20) {
work.DoWork(job, 1)
}
}
}
それでは動かします。
$ go test -bench=.
starting worker: 1
starting worker: 2
starting worker: 3
starting worker: 4
starting worker: 5
goos: windows
goarch: amd64
pkg: tutorials/concurrent-limiter
BenchmarkConcurrent-4 1 3001744600 ns/op
BenchmarkNonConcurrent-4 1 20006911100 ns/op
PASS
ok tutorials/concurrent-limiter 23.291s
結果はこのとおりです。ワーカープールによりパフォーマンスが大幅に良くなります。さらに、個々のジョブの所要時間が長くなり、動いているワーカー数が増えるほど、パフォーマンスがより良くなることがわかるでしょう。
私の記事を読んでくれてありがとう。
この記事がお役にたちましたら、教えてください👏👏👏
さらに読みたい場合は、下の「フォロー」ボタンをクリックしてください。
-
訳注: ライブラリとして提供されているワーカープールとしては gammazero/workerpool などがあります ↩
-
訳注: done channelを用いてgoroutineを終了するようになっていますが、Go1.7からcontextが導入されたのでcontext.Done()を用いることもできます ↩
-
訳注: その他の応用として、例えば Collector にバッファ付きチャネルを加えて、キューイングする処理を追加することもできます ↩