Go

golang.org/x/sync/semaphoreを使ってゴルーチンの同時実行数を制御する

特定の処理をゴルーチンで並列実行したいけれど、サーバの負荷等を考慮して、同時実行数の上限を設定したい話です。元ネタの記事では、チャネルやsync.Poolを使って実現していて、すでに十分シンプルなのですが、x/sync/semaphoreを使う方法も便利だったので紹介します。

見た目はほぼ、チャネルを使った実装と同じですが、s.Acquire(ctx, n)nの値で重みをつけることができます。なので、Aという処理が動いているときは他の処理を行わない、けれどBなら3個まで同時に動いても良い、といった対応をチャネルで行うと面倒ですが、semaphore.Weightedなら重みを変更するだけで実現できるので便利だと思いました。

元ネタ

実装例

以下の例は、同時実行数が3つに制限された状態でdoSomething(u)を並列実行します。サンプルコード自体はmattnさんのものをほぼそのまま流用しました。

全ての処理完了を待つためにsync.WaitGroupを使っていますが、semaphore.Weightedには全く関係ありません。

package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    "golang.org/x/sync/semaphore"
)

func doSomething(u string) {
    fmt.Println(u)
    time.Sleep(2 * time.Second)
}

const (
    Limit  = 3 // 同時実行数の上限
    Weight = 1 // 1処理あたりの実行コスト
)

func main() {
    urls := []string{
        "http://www.example.com",
        "http://www.example.net",
        "http://www.example.net/foo",
        "http://www.example.net/bar",
        "http://www.example.net/baz",
    }
    s := semaphore.NewWeighted(Limit)
    var w sync.WaitGroup
    for _, u := range urls {
        w.Add(1)
        s.Acquire(context.Background(), Weight)
        go func(u string) {
            doSomething(u)
            s.Release(Weight)
            w.Done()
        }(u)
    }
    w.Wait()
}

説明

まずは、semaphore.NewWeighted(lim)lim個のリソースをもつsemaphore.Weightedを作成します。s.Acquire(ctx, n)は、全体のリソース(lim)からn個消費しますが、Weightedのリソースが足りない場合は、s.Acquire(ctx, n)の呼び出しは他のゴルーチンからs.Release(n)されるまでブロックします。そのため、同時にlim個以上の処理が動くことはありません。

処理が終わった後s.Release(n)を使うと、n個のリソースをWeightedへ戻します。ブロックしていたs.Acquire(ctx, n)があれば、ブロックが解除されて続きの処理を行います。

また、s.TryAcquire(n)というメソッドも用意されていて、こちらはブロックしません。代わりに、リソースが取得できたらtrueを返します。