1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

動的にGoroutinesのワーカーを増減させる

Posted at

Goはゴルーチンを使うことで簡単に並行処理を行うことができます。
また、ゴルーチンはかなり軽量で大量に生成することができるとされています。

とはいえ上限はあるわけで、短時間であまりに大量にゴルーチンを作成した場合はパフォーマンスが悪化したり、リソースを確保しすぎてエラーが発生することもある(と思います)

そのためよく行われるのは、以下の2つかと思います(少なくとも自分は)

  1. あらかじめ規定数の worker を作成しておく
  2. セマフォを使って同時起動数を制限する

個人的にはどんどんゴルーチンを作るのがあまり好きではないので、あらかじめ規定数のworkerを作成することが多いです。

ただ設定ファイルなどで worker 数を設定していて、設定をリロードさせた場合に worker 数を反映させる際など動的に worker 数を変更したい場合に困ります。
セマフォを使っている場合であれば、セマフォの作りを工夫すれば動的に変更することは比較的簡単です。

ただ、随時ゴルーチンを起動するパターンだと、動作しているworker数をメトリクスとして取得しようとした場合に増減が激しいのが気に食わなかったので、workerをあらかじめ起動したうえでワーカー数を動的に変更する方法を考えてみました。

コード例

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"strings"
	"sync"
	"syscall"
	"time"
)

func orDone[T any](done <-chan struct{}, c <-chan T) <-chan T {
	valStream := make(chan T)
	go func() {
		defer close(valStream)
		for {
			select {
			case <-done:
				return
			case val, ok := <-c:
				if !ok {
					return
				}
				// メッセージをロストしないために、ここでは done チャネルをチェックしない
				// 代わりに下流が詰まっている場合は終了に時間がかかる
				valStream <- val
			}
		}
	}()
	return valStream
}

func producer(ctx context.Context) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)

		for i := 0; ; i++ {
			select {
			case <-ctx.Done():
				return
			default:
				out <- i
			}
		}
	}()
	return out
}

func consumer(wg *sync.WaitGroup, id int, in <-chan int, quit <-chan struct{}) {
	defer func() {
		log.Printf("=== worker %d has finished ===\n", id)
		wg.Done()
	}()

	log.Printf("=== worker %d has started ===\n", id)
	// quit からメッセージを受信したら終了
	for v := range orDone(quit, in) {
		time.Sleep(500 * time.Millisecond)
		log.Printf("%s(%d:%d)\n", strings.Repeat("\t", id%20), id, v)
	}
}

func main() {
	log.SetFlags(log.Ltime | log.Lmicroseconds)
	worker := 4

	wg := new(sync.WaitGroup)
	// worker追加命令用
	ctrlAdd := make(chan struct{})
	// worker削除命令用
	ctrlSub := make(chan struct{})

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// producer, consumer の worker起動
	in := producer(ctx)
	for i := 0; i < worker; i++ {
		wg.Add(1)
		go consumer(wg, i, in, ctrlSub)
		time.Sleep(100 * time.Millisecond)
	}

	// worker追加のハンドラ
	go func() {
		for range ctrlAdd {
			wg.Add(1)
			go consumer(wg, worker, in, ctrlSub)
			worker++
		}
	}()

	usrSigs := make(chan os.Signal, 1)
	signal.Notify(usrSigs, syscall.SIGUSR1, syscall.SIGUSR2)
	go func() {
		for {
			select {
			case sig := <-usrSigs:
				switch sig {
				case syscall.SIGUSR1:
					ctrlAdd <- struct{}{}
				case syscall.SIGUSR2:
					ctrlSub <- struct{}{}
				}
			case <-ctx.Done():
				return
			}
		}
	}()

	wg.Wait()
}

worker追加用と、worker削除用のチャンネルを用意しておき、追加用のチャネルにメッセージが追加されたら、新しくworkerを起動し、削除用のチャネルにメッセージが追加されたら、workerが1つ終了するようにしています。

workerを複数減らす操作がgraceful shutdownと被ると、削除用のメッセージを受信するゴルーチンが無くなることがあります。
その場合削除用のメッセージを送信する箇所でブロックする可能性があるので、実際のコードで使用する場合はチャネルのclose順序や排他制御を工夫する必要があります。

1. 規定数のworkerを起動しておくコード例

worker.go
package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func producer(ctx context.Context) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)

		for i := 0; ; i++ {
			select {
			case <-ctx.Done():
				return
			default:
				time.Sleep(100 * time.Millisecond)
				out <- i
			}
		}
	}()
	return out
}

func main() {
	worker := 4
	wg := new(sync.WaitGroup)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	in := producer(ctx)
	for i := 0; i < worker; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			for v := range in {
				fmt.Printf("%d: %d\n", id, v)
			}
		}(i)
	}

	wg.Wait()
}
実行例
$ go run worker.go
3: 0
0: 1
1: 2
2: 3
3: 4
0: 5
1: 6
2: 7
3: 8
0: 9
1: 10
2: 11
3: 12

複数のゴルーチンが終了するまで sync.WaitGroupWait() しています。

wg.Add(1) をゴルーチンの中に書いてしまうのはよくある失敗なので要注意

2. セマフォを使って同時起動数を制限するコード例

semaphore.go
package main

import (
	"context"
	"fmt"
	"time"
)

func producer(ctx context.Context) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)

		for i := 0; ; i++ {
			select {
			case <-ctx.Done():
				return
			default:
				out <- i
			}
		}
	}()
	return out
}

func main() {
	worker := 4

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	in := producer(ctx)
	sem := make(chan struct{}, worker)
	for v := range in {
		sem <- struct{}{}
		go func(v int) {
			defer func() { <-sem }()
			fmt.Printf("%d\n", v)
			time.Sleep(100 * time.Millisecond)
		}(v)
	}
}
実行例
$ go run sem.go
1
3
0
2
4
5
7
6
8
9
10
11

簡易的なセマフォのために、バッファ付きキューを使用しています。

セマフォで動的にworker数を変更する

dynamic_semaphore.go
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
 )

type DynamicSemaphore struct {
	capacity uint
	count    uint

	mu   sync.RWMutex
	cond *sync.Cond
}

func NewDynamicSemaphore(initialCapacity uint) *DynamicSemaphore {
	ds := &DynamicSemaphore{
		capacity: initialCapacity,
	}
	ds.cond = sync.NewCond(&ds.mu)
	return ds
}

func (ds *DynamicSemaphore) Acquire() {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	for ds.count >= ds.capacity {
		ds.cond.Wait()
	}
	ds.count++
}

func (ds *DynamicSemaphore) Release() {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	if ds.count > 0 {
		ds.count--
		ds.cond.Signal()
	}
}

func (ds *DynamicSemaphore) Set(capacity uint) {
	ds.mu.Lock()
	defer ds.mu.Unlock()

	ds.capacity = capacity
	ds.cond.Broadcast()
}

func (ds *DynamicSemaphore) Capacity() uint {
	ds.mu.RLock()
	defer ds.mu.RUnlock()

	return ds.capacity
}

func (ds *DynamicSemaphore) Count() uint {
	ds.mu.RLock()
	defer ds.mu.RUnlock()

	return ds.count
}

func producer(ctx context.Context) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)

		for i := 0; ; i++ {
			select {
			case <-ctx.Done():
				return
			default:
				out <- i
			}
		}
	}()
	return out
}

func main() {
	worker := 4

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	in := producer(ctx)
	sem := NewDynamicSemaphore(uint(worker))

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGUSR1, syscall.SIGUSR2)
	go func() {
		for sig := range sigs {
			switch sig {
			case syscall.SIGUSR1:
				worker++
				sem.Set(uint(worker))
			case syscall.SIGUSR2:
				worker--
				if worker < 1 {
					worker = 1
				}
				sem.Set(uint(worker))
			}
		}
	}()

	for v := range in {
		sem.Acquire()
		go func(v int) {
			defer sem.Release()
			fmt.Printf("%d\n", v)
			time.Sleep(500 * time.Millisecond)
		}(v)
	}
}

SIGUSR1と、SIGUSR2を受け取った場合に、それぞれworkerを増減させています。

動的なセマフォでは、キャパシティを変更させ sync.CondVar を使用することで Acquire() に通知させています。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?