はじめに
こんにちは、Retail AI の松尾 @rrrtaku です。
先月、初めての釣り堀で真鯛が4匹も釣れて家に持ち帰ったものの、三日三晩部屋中が磯臭い生活を送れました。今年一番の思い出になりそうです。
さて、こちらは、TRIAL&RetailAI Advent Calendar 2024 15日目の記事です。
昨日は @ikeda_takato さんの『CVE-2024-10979をローカルで再現してみる』という記事でした。
この記事では、sync/errgroup
の上位互換?とも言われる sourcegraph/conc
について、少し紹介してみたいと思います。
このパッケージは、日本のGo言語界隈で有名な tenntennさんが以前に説明なされている「tenntenn Conference 2024」や「手を動かしながら並行処理を学ぼう - Enablement Workshop for Gophers」の動画をきっかけに知りました。
sourcegraph/conc
とは?
sourcegraph/conc
は、sync.WaitGroup
や sync/errgroup
などの(準)標準ライブラリに代わる、より使いやすい並行処理のためのパッケージとして、注目を集めています。
-
sync/WaitGroup
は、Add
、Done
、Wait
の3つのメソッドを使って、goroutineの完了を待つことができますが、エラー処理が独自で行う必要があります。 -
sync/errgroup
は、errgroup.Group
を使ってgoroutineの完了を待ち、エラー処理を簡単に行うことができます。- また、
WithCancel
を使ってキャンセル処理も行ったり、SetLimit
を使って同時実行数を制限することもできます。
- また、
-
ただし、
sync/errgroup
は、エラーが発生した場合、最初に発生したエラーのみを返すため、複数のエラーを受け取ることができなかったり、goroutine内で発生したpanicをキャッチする処理が独自で必要になります。
そこで、sourcegraph/conc
は、errgroup.Group
の多くの機能を継承しつつ、これらの問題を上手く解決してくれています。
使い方 - メソッドチェーンによるpool
の生成
まず、簡単な説明として、sourcegraph/conc/pool
では、pool
を生成する際に、様々なメソッドをチェーンすることで、挙動を設定するようにできています。
以下が最終的に生成できるpool
の構造体の種類です。
-
Pool
- 基本的なpool
で、以下の派生型の基底となる-
ErrorPool
- エラーを受け取る -
ContextPool
- エラーを受け取れるし、キャンセルの伝播もできる
-
-
ResultPool
- スライスの任意の値を受け取れるpool
で、以下の派生型の基底となる-
ResultErrorPool
- スライスの任意の値とエラーを受け取る -
ResultContextPool
- スライスの任意の値とエラーを受け取れるし、キャンセルの伝播もできる
-
最終的に生成されたpool
に共通したメソッドとして、Go
やWait
メソッドがある、といった感じです。
具体例
1, pool.WithErrors
- 複数エラーを受け取る
pool.WithErrors
は、複数のエラーを受け取ることができます。
func WithErrors() {
ctx := context.Background()
p := pool.New().WithErrors().WithContext(ctx)
for i := 0; i < 10; i++ {
i := i
p.Go(func(ctx context.Context) error {
fmt.Println("Started:", i)
if i%2 == 0 {
return fmt.Errorf("error: %d", i)
}
time.Sleep(1 * time.Second)
fmt.Println("Done:", i)
return nil
})
}
if err := p.Wait(); err != nil {
fmt.Println(err)
}
}
/* Output
Started: 0
Started: 1
Started: 2
Started: 3
Started: 5
Started: 6
Started: 4
Started: 8
Started: 7
Started: 9
Done: 3
Done: 9
Done: 7
Done: 1
Done: 5
error: 0 // 以下複数のエラーをJoinして返す
error: 2
error: 6
error: 4
error: 8
*/
内部で、errors.Join
を使って複数のエラーを改行区切りで結合したエラーを返しています。
やっぱり最初のエラーだけ返してくれればいい場合は、WithFirstError
を使うこともできます。
p := pool.New().WithContext(ctx).WithFirstError()
...
/* Output
Started: 0
Started: 1
Started: 2
Started: 3
Started: 7
Started: 6
Started: 8
Started: 9
Started: 4
Started: 5
Done: 5
Done: 1
Done: 7
Done: 9
Done: 3
error: 0 // 1つ目のエラーのみ返す
*/
2, pool.ResultContextPool
- 任意の値も受け取る
pool.ResultContextPool
は、任意の値も受け取ることができる、一番設定が盛り沢山のpool
の構造体です。
(ここでは、context.Context
を使ったキャンセルの伝播の説明は、割愛します 🙇♂️)
func ResultContextPool() {
type result struct{ v int }
ctx := context.Background()
p := pool.NewWithResults[result]().WithContext(ctx)
for i := 0; i < 10; i++ {
i := i
p.Go(func(context.Context) (result, error) {
fmt.Println("Started:", i)
if i%2 == 0 {
return result{}, fmt.Errorf("error: %d", i)
}
time.Sleep(1 * time.Second)
fmt.Println("Done:", i)
return result{v: i}, nil
})
}
results, err := p.Wait()
if err != nil {
fmt.Println(err)
}
fmt.Println("len(results):", len(results))
for _, r := range results {
fmt.Println(r)
}
}
/* Output
Started: 0
Started: 2
Started: 3
Started: 1
Started: 5
Started: 6
Started: 8
Started: 7
Started: 9
Started: 4
Done: 1
Done: 9
Done: 5
Done: 7
Done: 3
error: 0
error: 2
error: 6
error: 8
error: 4
len(results): 5 // 以下成功したgoroutineの結果を返している
{1}
{5}
{9}
{7}
{3}
*/
pool.ResultPool
とその派生型を使うことで、型パラメータで指定した任意の値をスライスで受け取ることができます。
上記の例では、成功したgoroutineの結果のみを返していますが、WithCollectErrored
を使うことで、エラーが発生したgoroutineの結果も受け取ることができます。
p := pool.NewWithResults[result]().WithContext(ctx).WithCollectErrored()
...
/* Output
Started: 6
Started: 8
Started: 9
Started: 5
Started: 7
Started: 1
Started: 2
Started: 3
Done: 3
Done: 9
Done: 5
Done: 1
Done: 7
error: 0
error: 4
error: 6
error: 8
error: 2
len(results): 10 // 以下全てのgoroutineの結果を返している
{2}
{6}
{0}
{4}
{0}
{0}
{0}
{0}
{0}
{3}
{9}
{5}
{1}
{7}
*/
3, panics.Catcher
- goroutineのpanicのリカバリー
プログラムの用途がHTTPサーバの場合、各リクエストを処理するgoroutine内で万が一panicが発生することに備えて、recover
を使ってpanicをキャッチし、影響をそのgoroutine内に留める必要があります。そうしないと、プログラム全体が停止してしまう可能性があります。
(標準のhttp.Server
の場合、http.Handle
で登録したハンドラ内でpanicが発生しても、標準サーバがリカバリーしてくれますが(公式の説明)、そのハンドラ内でさらに自分で作成したgoroutine内でもしpanicが発生すると、リカバリー処理がないとプログラム全体が停止してしまいます)
sourcegraph/conc
では、panics.Catcher
を使って、実行した各goroutine内で発生したpanicをキャッチ・リカバリーし、エラーとして返すことができます。
func PanicCatcher() {
ctx := context.Background()
// use WithFirstError to prevent too many panics messing up the output...
p := pool.New().WithContext(ctx).WithFirstError()
for i := 0; i < 10; i++ {
i := i
p.Go(func(ctx context.Context) (err error) {
var catcher panics.Catcher
defer func() {
if paniced := catcher.Recovered(); paniced != nil {
err = paniced.AsError() // convert panic to error
}
}()
catcher.Try(func() { // catch panics from this function and recover them
fmt.Println("Started:", i)
if i%2 == 0 {
panic(fmt.Errorf("panic: %d", i))
}
time.Sleep(1 * time.Second)
fmt.Println("Done:", i)
err = nil
})
if err != nil {
return err
}
return nil
})
}
if err := p.Wait(); err != nil {
fmt.Println(err)
}
}
/*
Started: 0
Started: 1
Started: 2
Started: 4
Started: 6
Started: 5
Started: 8
Started: 7
Started: 3
Started: 9
Done: 9
Done: 5
Done: 3
Done: 1
Done: 7
panic: panic: 2 // 発生したpanicをerrorとして出力している
stacktrace:
goroutine 33 [running]:
runtime/debug.Stack()
/usr/local/go/src/runtime/debug/stack.go:24 +0x64
github.com/sourcegraph/conc/panics.NewRecovered(0x1, {0x100ac4700, 0x14000184010})
/Users/user/go/pkg/mod/github.com/sourcegraph/conc@v0.3.0/panics/panics.go:59 +0xbc
github.com/sourcegraph/conc/panics.(*Catcher).tryRecover(0x1400019a000)
/Users/user/go/pkg/mod/github.com/sourcegraph/conc@v0.3.0/panics/panics.go:28 +0x5c
panic({0x100ac4700?, 0x14000184010?})
...
*/
4, TryGo
は無いみたい
sourcegraph/conc
が提供するpool
やその他の構造体では、Go
メソッドを使ってgoroutineを実行する際に、WithMaxGoroutines
を使って同時実行数を制限することができます。
実行中のgoroutine数が制限を超えた場合、Go
メソッドを呼び出すと、実行中のgoroutineが終了して空きができるまでブロックされます。
func BlockingGoWithLimit() {
ctx := context.Background()
limit := 10
p := pool.New().WithContext(ctx).WithMaxGoroutines(limit) // limit the number of goroutines
sleepAndLog := func(i int) error {
log.Println("Started:", i)
time.Sleep(time.Duration(1+i) * time.Second) // sleep for i seconds (starting from 1, 2, 3, ...)
log.Println("Done:", i)
return nil
}
for i := 0; i < limit; i++ {
i := i
p.Go(func(ctx context.Context) error {
return sleepAndLog(i)
})
}
for i := limit; i < 15; i++ {
i := i
log.Println("Submitting...", i) // log before submitting
// this will block until a goroutine finishes
p.Go(func(ctx context.Context) error {
return sleepAndLog(i)
})
log.Println("Submitted:", i)
}
if err := p.Wait(); err != nil {
fmt.Println(err)
}
}
/* Output
2024/12/13 20:44:33 Started: 8
2024/12/13 20:44:33 Started: 6
2024/12/13 20:44:33 Started: 7
2024/12/13 20:44:33 Started: 9
2024/12/13 20:44:33 Submitting... 10
2024/12/13 20:44:33 Started: 1
2024/12/13 20:44:33 Started: 0
2024/12/13 20:44:33 Started: 2
2024/12/13 20:44:33 Started: 3
2024/12/13 20:44:33 Started: 4
2024/12/13 20:44:33 Started: 5
2024/12/13 20:44:34 Done: 0
2024/12/13 20:44:34 Started: 10
2024/12/13 20:44:34 Submitted: 10 // 0のgoroutineが終了したので、10のgoroutineがsubmitされた
2024/12/13 20:44:34 Submitting... 11
2024/12/13 20:44:35 Done: 1
2024/12/13 20:44:35 Submitted: 11 // 1のgoroutineが終了したので、11のgoroutineがsubmitされた
2024/12/13 20:44:35 Submitting... 12
2024/12/13 20:44:35 Started: 11
2024/12/13 20:44:36 Done: 2
2024/12/13 20:44:36 Started: 12
2024/12/13 20:44:36 Submitted: 12 // 2のgoroutineが終了したので、12のgoroutineがsubmitされた
2024/12/13 20:44:36 Submitting... 13
2024/12/13 20:44:37 Done: 3
2024/12/13 20:44:37 Started: 13
2024/12/13 20:44:37 Submitted: 13 // 3のgoroutineが終了したので、13のgoroutineがsubmitされた
2024/12/13 20:44:37 Submitting... 14
2024/12/13 20:44:38 Done: 4
2024/12/13 20:44:38 Submitted: 14 // 4のgoroutineが終了したので、14のgoroutineがsubmitされた
2024/12/13 20:44:38 Started: 14
2024/12/13 20:44:39 Done: 5
2024/12/13 20:44:40 Done: 6
2024/12/13 20:44:41 Done: 7
2024/12/13 20:44:42 Done: 8
2024/12/13 20:44:43 Done: 9
2024/12/13 20:44:45 Done: 10
2024/12/13 20:44:47 Done: 11
2024/12/13 20:44:49 Done: 12
2024/12/13 20:44:51 Done: 13
2024/12/13 20:44:53 Done: 14
*/
一方で、sync/errgroup
には、同時実行数の空きがない場合は即座にfalse
(= goroutineが新たに開始されなかったことを意味)を返すTryGo
メソッドがあります。
TryGo
メソッドの挙動の詳細については、@kolukuさんの『errorgroup.SetLimitとTryGoでgoroutineの同時実行数を制御する』で解説されています。
コメント欄でのGo
メソッドとの使い分けに関する議論も合わせて参照すると良さそうです。
5, その他のパッケージ - iter
,stream
本題のやりたいこととは少し逸れますが、sourcegraph/conc
では、タスクをsubmitした(Stream.Go
メソッドを呼び出した)順序を維持したまま並行処理を実行するstream
や、スライスの各要素に対して安全に並行処理を実行するiter
も提供されています。
こちらは正直まだ触ったことが無いので、gackyさんの『Goの並行処理をスッキリ安全に書ける「sourcegraph/conc」ええやんって話』をご参照ください。
他にも紹介していないチェーンメソッド等があるので、公式のドキュメントを参照してみてください。
おわりに
ここまで読んでいただきありがとうございました。
Goは並行処理が書きやすいと言われるものの、並行処理のプログラムを意図した通りに書くのは変わらず難しいな、と思います。
自分で直接チャネル、goキーワードを使って書けても、コードが複雑になりがちでチームでのメンテが大変になりそうなので、sourcegraph/conc
のように簡潔に書けて信頼の置けそうなパッケージに頼るのはありかな、と個人的に思っています。
明日のアドベントカレンダーは、
@mrsd さんの『PydanticのLLMエージェントフレームワーク「Open WebUIにSwarmを組み込む」』です。
お楽しみに〜
RetailAIとTRIALではエンジニアを募集しています。
もしよかったら覗いてみてください。