LoginSignup
21
15

More than 3 years have passed since last update.

Goでワーカープールを15分で実装する方法

Posted at

こちらの記事は、Joseph Livni氏により2018年10月に公開された『 Write a Go Worker Pool in 15 minutes 』の和訳です。
本記事は原著者から許可を得た上で記事を公開しています。


1_ugshDOhXfC287WWhG4IfSA.jpeg

私は多くのユーザーリクエストを高速に処理するGoのサービスを構築していました。Goroutineのプールを使ってカプセル化することにより、パフォーマンスは20倍になりました。本記事では独自のワーカープールを作る方法を説明します。1

最終的な結果を知りたい場合は こちら からダウンロードしてください。2

本記事は以下の順番で説明します。

  • モックとなるジョブの作成
  • ワーカープールの作成
  • ベンチマークテストの実施

ファイル構造は以下です。

/go_worker_pool
    /work
        work.go
    /pool
        worker.go
        dispatcher.go
    bench_test.go
    main.go

上記を構築するディレクトリパスは以下のとおりです。

go/src/github.com/Lebonesco/go_worker_pool

最終的には以下のようになります。

$ go run main.go
2018/10/06 15:53:43 starting application...
2018/10/06 15:53:43 starting worker:  1
2018/10/06 15:53:43 starting worker:  2
2018/10/06 15:53:43 starting worker:  3
2018/10/06 15:53:43 starting worker:  4
2018/10/06 15:53:43 starting worker:  5
2018/10/06 15:53:43 creating jobs...
worker [2] - created hash [2376065843] from word [iCMRAjWw]
worker [4] - created hash [121297580] from word [xhxKQFDa]
worker [1] - created hash [3193224551] from word [XVlBzgba]
worker [3] - created hash [1481401259] from word [hTHctcuA]
worker [5] - created hash [166906897] from word [FpLSjFbc]
worker [5] - created hash [1752784812] from word [QYhYzRyW]
...

モックとなるジョブの作成

完了するまでに時間がかかる処理をシミュレートするために、ランダムな文字列を使って、たくさんのジョブを作成します。ジョブは何らかの処理を実行します。今回は文字列のハッシュを生成します。

work.go
package job

import (
    "fmt"
    "hash/fnv"
    "time"
    "math/rand"
    "os"
)

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

// create random string
func RandStringRunes(n int) string {
    b := make([]rune, n)
    for i := range b {
        b[i] = letterRunes[rand.Intn(len(letterRunes))]
    }
    return string(b)
}

// create list of jobs
func CreateJobs(amount int) []string {
    var jobs []string

    for i := 0; i < amount; i++ {
        jobs = append(jobs, RandStringRunes(8))
    }
    return jobs
}

// mimics any type of job that can be run concurrently
func DoWork(word string, id int) {
    h := fnv.New32a()
    h.Write([]byte(word))
    time.Sleep(time.Second)
    if os.Getenv("DEBUG") == "true" {
        fmt.Printf("worker [%d] - created hash [%d] from word [%s]\n", id, h.Sum32(), word)
    }
}

DoWork()stringworker id を受け取って、文字列のハッシュを計算します。結果を出力する前に1秒間sleepします。ジョブの終了時に結果を表示するには、以下をセットします。

$ export DEBUG="true"

後ほどベンチマークテストをするときには、ハッシュの結果を表示することは不要であることと、テスト結果が見にくくなることから、制御できるようにしています。

ワーカープールの作成

次は、実際にワーカープールを作成していきましょう。ワーカープールは3つの要素で構成されます。dispatcher (ワーカーをインスタンス化し、ワーカーとワーカープールを接続します)と workers (処理を待っているジョブを受け取り、処理をします)と collector (ジョブの到着を待って、workers に割り当てます)です。

以下のコードで注意すべき重要な内容は WorkerChannel と Worker構造体の Start() メソッドです。

Goでよく知られているフレーズに “Do not communicate by sharing memory; instead, share memory by communicating.3 があります。これが WorkerChannel の目的であり、ワーカーとワーカープールを通信する方法です。WorkerChannel は利用できるすべてのワーカーのチャネルを保持します。ジョブが到着すると collectorWorkerChannel からワーカーチャネルを取得し、ジョブをそのチャネルに渡します。ワーカーがそのジョブを受け取ります。逆に、ワーカーは処理が終了するとそのチャネルを WorkerChannel に戻し、次のジョブを待ちます。

worker.go
package pool

import (
    "log"
    work "github.com/Lebonesco/go_worker_pool/work"
)

type Work struct {
    ID  int
    Job string
}

type Worker struct {
    ID int
    WorkerChannel chan chan Work // used to communicate between dispatcher and workers
    Channel chan Work
    End chan bool
}

// start worker
func (w *Worker) Start() {
    go func() {
        for {
            w.WorkerChannel <-w.Channel // when the worker is available place channel in queue
            select {
            case job := <-w.Channel: // worker has received job
                work.DoWork(job.Job, w.ID) // do work
            case <-w.End:
                return 
            }
        }
    }()
}

// end worker
func (w *Worker) Stop() {
    log.Printf("worker [%d] is stopping", w.ID)
    w.End <- true
}

すばらしい!終わりに近づいています。ワーカープールを仕上げるために dispatchercollector を完成させましょう。その前に、要素を組み立てるという点では、ワーカープールを設計する方法はたくさんあることを知っておいてください。

コードの可読性や実装方法に応じて、よりうまくいく方法があります。workers, dispatcher, collector の要素を用いて、あなたのプロジェクトに応用できるでしょう。たとえば collectordispatcher から分ける人もいます。私の場合 dispatcher から Collector 構造体を返します。独自のワーカープールを実装したい人にとって可読性が良いと考えているためです。4

dispatcher.go
package pool

import (
    "log"
)

var WorkerChannel = make(chan chan Work)

type Collector struct {
    Work chan Work // receives jobs to send to workers
    End chan bool // when receives bool stops workers
}

func StartDispatcher(workerCount int) Collector {
    var i int
    var workers []Worker
    input := make(chan Work) // channel to recieve work
    end := make(chan bool) // channel to spin down workers
    collector := Collector{Work: input, End: end}

    for i < workerCount {
        i++
        log.Println("starting worker: ", i)
        worker := Worker{
                ID: i,
                Channel: make(chan Work),
                WorkerChannel: WorkerChannel,
                End: make(chan bool)}
        worker.Start()
        workers = append(workers, worker) // stores worker
    }

    // start collector
    go func() {
        for {
            select {
            case <-end:
                for _, w := range workers {
                    w.Stop() // stop worker
                }
                return
            case work := <-input:
                worker := <-WorkerChannel // wait for available channel
                worker <-work // dispatch work to worker
            }
        }
    }()

    return collector
}

最後に、アプリケーションを動かすドライバーを作りましょう。main() 関数がワーカープールをインスタンス化し、ジョブを作成します。

main.go
package main 

import (
    "log"
    "github.com/Lebonesco/go_worker_pool/pool"
    work "github.com/Lebonesco/go_worker_pool/work"
)

const WORKER_COUNT = 5
const JOB_COUNT = 100

func main() {
    log.Println("starting application...")
    collector := pool.StartDispatcher(WORKER_COUNT) // start up worker pool

    for i, job := range work.CreateJobs(JOB_COUNT) {
        collector.Work <-pool.Work{Job: job, ID: i}
    }
}

ベンチマークテストの実施

このワーカープールが実際に機能することを確認するために、簡単なベンチマークテストを動かしてみましょう。Goはテストフレームワークが組み込まれているため、とても簡単にできます。

bench_test.go
package main 

import (
    "testing"
    "github.com/Lebonesco/go_worker_pool/pool"
    work "github.com/Lebonesco/go_worker_pool/work"
)
func BenchmarkConcurrent(b *testing.B) {
    collector := pool.StartDispatcher(WORKER_COUNT) // start up worker pool

    for n := 0; n < b.N; n++ {
        for i, job := range work.CreateJobs(20) {
            collector.Work <-pool.Work{Job: job, ID: i}
        }
    }
}

func BenchmarkNonconcurrent(b *testing.B) {
    for n := 0; n < b.N; n++ {
        for _, job := range work.CreateJobs(20) {
            work.DoWork(job, 1)
        }
    }
}

それでは動かします。

$ go test -bench=.
starting worker:  1
starting worker:  2
starting worker:  3
starting worker:  4
starting worker:  5
goos: windows
goarch: amd64
pkg: tutorials/concurrent-limiter
BenchmarkConcurrent-4              1        3001744600 ns/op
BenchmarkNonConcurrent-4           1        20006911100 ns/op
PASS
ok      tutorials/concurrent-limiter    23.291s

結果はこのとおりです。ワーカープールによりパフォーマンスが大幅に良くなります。さらに、個々のジョブの所要時間が長くなり、動いているワーカー数が増えるほど、パフォーマンスがより良くなることがわかるでしょう。

私の記事を読んでくれてありがとう。

この記事がお役にたちましたら、教えてください👏👏👏

さらに読みたい場合は、下の「フォロー」ボタンをクリックしてください。


  1. 訳注: ライブラリとして提供されているワーカープールとしては gammazero/workerpool などがあります 

  2. 訳注: done channelを用いてgoroutineを終了するようになっていますが、Go1.7からcontextが導入されたのでcontext.Done()を用いることもできます 

  3. https://blog.golang.org/share-memory-by-communicating 

  4. 訳注: その他の応用として、例えば Collector にバッファ付きチャネルを加えて、キューイングする処理を追加することもできます 

21
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
21
15