13
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

channelとsync.Poolを使ってgoroutine内の処理の同時実行数を制御する

Last updated at Posted at 2017-12-22

元ネタ

どちらも、固定長のバッファを持ったチャネルを共有リソースとすることでgoroutineの同時実行数を制御するパターンの例として紹介されています。

sync.Poolを使うともうすこし可読性があがると思ったので、サンプルコードを書いてみました。
今回ベースにしたのはmotemenさんの方で、実行数の制御とワーカーリソースのプールを主題としています。

NOTE:

元ネタのBig Sky :: 簡単に goroutine の実行個数を制限する方法にも追記がありますが、goroutine自体の同時実行数は抑制できていません。ここで紹介するのはgoroutine内の処理実態を抑制する手法です。goroutineが大量に起動する可能性がある場合は事前にgoroutineを起動しておくパターンを検討してください。

sync.Pool

使い方ですが、sync.Pool内の生成関数Newにほしいリソース取得処理を書きます。
※生成関数が返すのは変数のポインタです。

pool := sync.Pool{New: func() interface{} {
    t := time.Now()
    return &t
}}

取り出したものは型キャストして使い、必要なくなったらプールに戻します。

t1 := pool.Get().(*time.Time)
time.Sleep(100 * time.Millisecond)

t2 := pool.Get().(*time.Time)
fmt.Println(t1 == t2) // output: false

pool.Put(t1)

t3 := pool.Get().(*time.Time)
fmt.Println(t3)
fmt.Println(t1 == t3) // output: true

残念ながら sync.Pool に数の制限のしくみはないので独自のプールが必要になります。
今回はworkerリソース制御のしくみをつくりました。

ワーカープールを作る

同時実行数制御のlimitとワーカープールを内包したstruct workers をつくります。

type workers struct {
	limit chan struct{}
	pool  sync.Pool
}

func newWorkers(n int) *workers {
	ws := workers{}
	ws.limit = make(chan struct{}, n)
	ws.pool = sync.Pool{New: func() interface{} {
		return &worker{}
	}}
	return &ws
}

プールからのGetはlimitチャネルの空き状況によって制限されます(待たされます)。

func (ws *workers) Get() *worker {
	ws.limit <- struct{}{} // 空くまで待つ
	return ws.pool.Get().(*worker)
}

戻すときは同時にlimitチャネルから解放します。

func (ws *workers) Put(w *worker) {
	ws.pool.Put(w)
	<-ws.limit // 解放
}

このworkersを使ったmain関数はこうなります。
goroutineからfor/selectが消えてすっきりしました。

func main() {
	ws := newWorkers(5)
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			w := ws.Get()
			log.Printf("[worker %v] --> goroutine %d", w, i)
			w.work()
			log.Printf("[worker %v] <-- goroutine %d", w, i)
			ws.Put(w)
		}(i)
	}
	wg.Wait()
}

全体像はこんな感じ。

今回はcancel処理を省略しましたが、実際はworkerPoolにキャンセル用の仕組みを組み込む必要が有ってもうすこし複雑になりそうです。

mattnさんのコードをもとにしたものはこんな感じです。

13
8
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?