Goはゴルーチンを使うことで簡単に並行処理を行うことができます。
また、ゴルーチンはかなり軽量で大量に生成することができるとされています。
とはいえ上限はあるわけで、短時間であまりに大量にゴルーチンを作成した場合はパフォーマンスが悪化したり、リソースを確保しすぎてエラーが発生することもある(と思います)
そのためよく行われるのは、以下の2つかと思います(少なくとも自分は)
- あらかじめ規定数の worker を作成しておく
- セマフォを使って同時起動数を制限する
個人的にはどんどんゴルーチンを作るのがあまり好きではないので、あらかじめ規定数のworkerを作成することが多いです。
ただ設定ファイルなどで worker 数を設定していて、設定をリロードさせた場合に worker 数を反映させる際など動的に worker 数を変更したい場合に困ります。
セマフォを使っている場合であれば、セマフォの作りを工夫すれば動的に変更することは比較的簡単です。
ただ、随時ゴルーチンを起動するパターンだと、動作しているworker数をメトリクスとして取得しようとした場合に増減が激しいのが気に食わなかったので、workerをあらかじめ起動したうえでワーカー数を動的に変更する方法を考えてみました。
コード例
package main
import (
"context"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
)
func orDone[T any](done <-chan struct{}, c <-chan T) <-chan T {
valStream := make(chan T)
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case val, ok := <-c:
if !ok {
return
}
// メッセージをロストしないために、ここでは done チャネルをチェックしない
// 代わりに下流が詰まっている場合は終了に時間がかかる
valStream <- val
}
}
}()
return valStream
}
func producer(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; ; i++ {
select {
case <-ctx.Done():
return
default:
out <- i
}
}
}()
return out
}
func consumer(wg *sync.WaitGroup, id int, in <-chan int, quit <-chan struct{}) {
defer func() {
log.Printf("=== worker %d has finished ===\n", id)
wg.Done()
}()
log.Printf("=== worker %d has started ===\n", id)
// quit からメッセージを受信したら終了
for v := range orDone(quit, in) {
time.Sleep(500 * time.Millisecond)
log.Printf("%s(%d:%d)\n", strings.Repeat("\t", id%20), id, v)
}
}
func main() {
log.SetFlags(log.Ltime | log.Lmicroseconds)
worker := 4
wg := new(sync.WaitGroup)
// worker追加命令用
ctrlAdd := make(chan struct{})
// worker削除命令用
ctrlSub := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// producer, consumer の worker起動
in := producer(ctx)
for i := 0; i < worker; i++ {
wg.Add(1)
go consumer(wg, i, in, ctrlSub)
time.Sleep(100 * time.Millisecond)
}
// worker追加のハンドラ
go func() {
for range ctrlAdd {
wg.Add(1)
go consumer(wg, worker, in, ctrlSub)
worker++
}
}()
usrSigs := make(chan os.Signal, 1)
signal.Notify(usrSigs, syscall.SIGUSR1, syscall.SIGUSR2)
go func() {
for {
select {
case sig := <-usrSigs:
switch sig {
case syscall.SIGUSR1:
ctrlAdd <- struct{}{}
case syscall.SIGUSR2:
ctrlSub <- struct{}{}
}
case <-ctx.Done():
return
}
}
}()
wg.Wait()
}
worker追加用と、worker削除用のチャンネルを用意しておき、追加用のチャネルにメッセージが追加されたら、新しくworkerを起動し、削除用のチャネルにメッセージが追加されたら、workerが1つ終了するようにしています。
workerを複数減らす操作がgraceful shutdownと被ると、削除用のメッセージを受信するゴルーチンが無くなることがあります。
その場合削除用のメッセージを送信する箇所でブロックする可能性があるので、実際のコードで使用する場合はチャネルのclose順序や排他制御を工夫する必要があります。
1. 規定数のworkerを起動しておくコード例
package main
import (
"context"
"fmt"
"sync"
"time"
)
func producer(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; ; i++ {
select {
case <-ctx.Done():
return
default:
time.Sleep(100 * time.Millisecond)
out <- i
}
}
}()
return out
}
func main() {
worker := 4
wg := new(sync.WaitGroup)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
in := producer(ctx)
for i := 0; i < worker; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for v := range in {
fmt.Printf("%d: %d\n", id, v)
}
}(i)
}
wg.Wait()
}
$ go run worker.go
3: 0
0: 1
1: 2
2: 3
3: 4
0: 5
1: 6
2: 7
3: 8
0: 9
1: 10
2: 11
3: 12
複数のゴルーチンが終了するまで sync.WaitGroup
で Wait()
しています。
wg.Add(1)
をゴルーチンの中に書いてしまうのはよくある失敗なので要注意
2. セマフォを使って同時起動数を制限するコード例
package main
import (
"context"
"fmt"
"time"
)
func producer(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; ; i++ {
select {
case <-ctx.Done():
return
default:
out <- i
}
}
}()
return out
}
func main() {
worker := 4
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
in := producer(ctx)
sem := make(chan struct{}, worker)
for v := range in {
sem <- struct{}{}
go func(v int) {
defer func() { <-sem }()
fmt.Printf("%d\n", v)
time.Sleep(100 * time.Millisecond)
}(v)
}
}
$ go run sem.go
1
3
0
2
4
5
7
6
8
9
10
11
簡易的なセマフォのために、バッファ付きキューを使用しています。
セマフォで動的にworker数を変更する
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
type DynamicSemaphore struct {
capacity uint
count uint
mu sync.RWMutex
cond *sync.Cond
}
func NewDynamicSemaphore(initialCapacity uint) *DynamicSemaphore {
ds := &DynamicSemaphore{
capacity: initialCapacity,
}
ds.cond = sync.NewCond(&ds.mu)
return ds
}
func (ds *DynamicSemaphore) Acquire() {
ds.mu.Lock()
defer ds.mu.Unlock()
for ds.count >= ds.capacity {
ds.cond.Wait()
}
ds.count++
}
func (ds *DynamicSemaphore) Release() {
ds.mu.Lock()
defer ds.mu.Unlock()
if ds.count > 0 {
ds.count--
ds.cond.Signal()
}
}
func (ds *DynamicSemaphore) Set(capacity uint) {
ds.mu.Lock()
defer ds.mu.Unlock()
ds.capacity = capacity
ds.cond.Broadcast()
}
func (ds *DynamicSemaphore) Capacity() uint {
ds.mu.RLock()
defer ds.mu.RUnlock()
return ds.capacity
}
func (ds *DynamicSemaphore) Count() uint {
ds.mu.RLock()
defer ds.mu.RUnlock()
return ds.count
}
func producer(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; ; i++ {
select {
case <-ctx.Done():
return
default:
out <- i
}
}
}()
return out
}
func main() {
worker := 4
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
in := producer(ctx)
sem := NewDynamicSemaphore(uint(worker))
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGUSR1, syscall.SIGUSR2)
go func() {
for sig := range sigs {
switch sig {
case syscall.SIGUSR1:
worker++
sem.Set(uint(worker))
case syscall.SIGUSR2:
worker--
if worker < 1 {
worker = 1
}
sem.Set(uint(worker))
}
}
}()
for v := range in {
sem.Acquire()
go func(v int) {
defer sem.Release()
fmt.Printf("%d\n", v)
time.Sleep(500 * time.Millisecond)
}(v)
}
}
SIGUSR1と、SIGUSR2を受け取った場合に、それぞれworkerを増減させています。
動的なセマフォでは、キャパシティを変更させ sync.CondVar
を使用することで Acquire()
に通知させています。