元ネタ
どちらも、固定長のバッファを持ったチャネルを共有リソースとすることでgoroutineの同時実行数を制御するパターンの例として紹介されています。
sync.Pool
を使うともうすこし可読性があがると思ったので、サンプルコードを書いてみました。
今回ベースにしたのはmotemenさんの方で、実行数の制御とワーカーリソースのプールを主題としています。
NOTE:
元ネタのBig Sky :: 簡単に goroutine の実行個数を制限する方法にも追記がありますが、goroutine自体の同時実行数は抑制できていません。ここで紹介するのはgoroutine内の処理実態を抑制する手法です。goroutineが大量に起動する可能性がある場合は事前にgoroutineを起動しておくパターンを検討してください。
sync.Pool
使い方ですが、sync.Pool内の生成関数New
にほしいリソース取得処理を書きます。
※生成関数が返すのは変数のポインタです。
pool := sync.Pool{New: func() interface{} {
t := time.Now()
return &t
}}
取り出したものは型キャストして使い、必要なくなったらプールに戻します。
t1 := pool.Get().(*time.Time)
time.Sleep(100 * time.Millisecond)
t2 := pool.Get().(*time.Time)
fmt.Println(t1 == t2) // output: false
pool.Put(t1)
t3 := pool.Get().(*time.Time)
fmt.Println(t3)
fmt.Println(t1 == t3) // output: true
残念ながら sync.Pool
に数の制限のしくみはないので独自のプールが必要になります。
今回はworkerリソース制御のしくみをつくりました。
ワーカープールを作る
同時実行数制御のlimitとワーカープールを内包したstruct workers
をつくります。
type workers struct {
limit chan struct{}
pool sync.Pool
}
func newWorkers(n int) *workers {
ws := workers{}
ws.limit = make(chan struct{}, n)
ws.pool = sync.Pool{New: func() interface{} {
return &worker{}
}}
return &ws
}
プールからのGetはlimitチャネルの空き状況によって制限されます(待たされます)。
func (ws *workers) Get() *worker {
ws.limit <- struct{}{} // 空くまで待つ
return ws.pool.Get().(*worker)
}
戻すときは同時にlimitチャネルから解放します。
func (ws *workers) Put(w *worker) {
ws.pool.Put(w)
<-ws.limit // 解放
}
このworkersを使ったmain関数はこうなります。
goroutineからfor/selectが消えてすっきりしました。
func main() {
ws := newWorkers(5)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
w := ws.Get()
log.Printf("[worker %v] --> goroutine %d", w, i)
w.work()
log.Printf("[worker %v] <-- goroutine %d", w, i)
ws.Put(w)
}(i)
}
wg.Wait()
}
全体像はこんな感じ。
今回はcancel処理を省略しましたが、実際はworkerPoolにキャンセル用の仕組みを組み込む必要が有ってもうすこし複雑になりそうです。
mattnさんのコードをもとにしたものはこんな感じです。