お題
例えば、以下みたいな要件をgoで実装する時にこんな書き方にするかなと。
DBに書き込まれた「注文」情報を cron あたりで定期的に拾って、外部APIコールで「注文」をリクエスト。
ただし、外部APIは同期でレスポンス返すのに平均 10 秒くらいかかる。
「注文」情報は随時DBに書き込まれ、なるべく早く外部APIコールで処理したい。
goで作るアプリは cron で毎分ないし毎 3 分ぐらいな間隔で起動するとして、
1回の起動で(できれば、そのときDBに存在する全ての「注文」をさばきたいけど、仮に100万「注文」あったらマシン落ちるとかなると困るし)並行処理させる goroutine 数は制限する必要がある。
これは channel 使ったセマフォ制御がよく使われる。(使い方自体は別に大したことはなくて、いつも問題になるのは同時並行 goroutine 数をいくつにするか・・・。)
結局このあたりは、毎分ないし毎 3 分で実行する場合、本番の流量でどれくらいDBに「注文」情報が溜まっているかなど計測しながら随時調整していくのだろうけど。。。
(そして、アプリを載せてるサーバ(今ならコンテナ、k8sクラスタ)のスペックとの相談にもなる?)
とりあえず今回は本番用のアプリを作るというのではなく単に以下が確認したいだけなので、同時並行 goroutine 数は適当に「3」としておく。
『errgroupを使うと並行起動した goroutine 全ての終了を待ち受けてアプリを終了できる。また、1つの goroutine でエラーが起きたら後続の goroutine の実行も止めることができる。』
開発環境
# OS - Linux(Ubuntu)
$ cat /etc/os-release
NAME="Ubuntu"
VERSION="18.04.4 LTS (Bionic Beaver)"
# バックエンド
# 言語 - Golang
$ go version
go version go1.14.2 linux/amd64
IDE - Goland
GoLand 2020.1.2
Build #GO-201.7223.97, built on May 1, 2020
全ソース
実践
並行起動した全 goroutine の終了を待ち受ける
処理概要
5件の「注文」がDBにある状態で同時並行3で処理をしたケース。
まず、DBから5件の「注文」情報を拾う。
次に、拾った「注文」をループ処理。
ループ1件ごとに、goroutine 起動し、その中で以下の処理を行う。
①DBに今からリクエスト投げる旨を書き込み(ログ「[Write ]
」で表現)
②APIコールで「注文」をリクエスト(ログ「[Request]
」で表現)※平均 10 秒かかる想定)
③DBに「注文」結果を書き込み(ログ「[Save ]
」で表現)
④「注文」結果をメール送信(ログ「[Send ]
」で表現)※平均 3 秒かかる想定)
5件をループで回して処理するので、1件が平均 13 秒以上かかるので、直列なら全て処理しきるのに1分以上かかる見込み。
実際には、以下の「挙動(ログ)」の通り、30 秒ほどで終わっている。これは、同時に3件まで平行処理をしているため。
挙動(ログ)
START: 2020-06-07T00:01:59+09:00
loop-index: 0 [ID:id001][Name:注文1]
loop-index: 1 [ID:id002][Name:注文2]
loop-index: 2 [ID:id003][Name:注文3]
[Write ] id002
[Write ] id001
[Write ] id003
[Request] id001
[Save ] id001
[Request] id003
[Save ] id003
[Request] id002
[Save ] id002
[Send ] id001
loop-index: 3 [ID:id004][Name:注文4]
[Send ] id003
loop-index: 4 [ID:id005][Name:注文5]
[Write ] id004
[Write ] id005
[Send ] id002
[Request] id004
[Save ] id004
[Send ] id004
[Request] id005
[Save ] id005
[Send ] id005
END : 2020-06-07T00:02:29+09:00
まず、以下の部分。
loop-index: 0 [ID:id001][Name:注文1]
loop-index: 1 [ID:id002][Name:注文2]
loop-index: 2 [ID:id003][Name:注文3]
注文は全部で5件あるので同時並行数の制御を入れていなければ、ここに5件分のログが出る。
3件しか出ていないのは、同時並行数「3」で実装しているため。
このログのあと、id001
〜id003
までのそれぞれの処理が同時並行に動き、最初に id001
の処理が終わった([Send ] id001
のログ)ため、その時点で同時並行数「2」となり、1つ余裕が出来た。
というわけで即座に次の id004
の処理が開始される。(loop-index: 3 [ID:id004][Name:注文4]
のログ)
で、その次に、先行して動いていた id003
の処理も終わって([Send ] id003
のログ)、また1つ起動可能になったので、即座に次の id005
の処理が開始される。(loop-index: 4 [ID:id005][Name:注文5]
のログ)
ソース解説
main.go
main.go
の中で、別途定義したサービスクラスを生成(NewService(~)
)し実行(.Exec()
)後、エラーハンドリングするだけ。
func main() {
if err := svc.NewService(client.NewDBClient()).Exec(); err != nil {
fmt.Printf("failed to exec service: %+v", err)
}
}
上記で呼んでいるNewService(~)
は以下に由来。
service.go(NewService())
type Service interface {
Exec() error
}
type service struct {
dbClient client.DBClient
}
func NewService(dbClient client.DBClient) Service {
return &service{dbClient: dbClient}
}
service.go(Exec())
当アプリの本体ロジックと言えるのが以下のExec()
の中身。
重要な部分のみ抜粋。(以下だけではプログラムは動かないので注意)
func (s *service) Exec() error {
// 5件の「注文」をDBから取得
orders, err := s.dbClient.CollectOrders()
// ★ 同時実行 goroutine 数の制御のためにチャネル用意
semaphore := make(chan struct{}, 3)
// ★ エラーグループ生成
errGrp, eCtx := errgroup.WithContext(context.Background())
for idx, order := range orders {
sr := subroutine{}
// [参照] https://golang.org/doc/faq#closures_and_goroutines
o := order
// 3個までは詰められる(でも、それ以上はチャネルが空くまで詰められずにここで待機状態となる)
semaphore <- struct{}{}
errGrp.Go(func() error {
select {
case <-eCtx.Done():
return xerrors.Errorf("canceled: %#v", o)
default:
return sr.exec(eCtx, o, semaphore)
}
})
}
if err := errGrp.Wait(); err != nil {
return xerrors.Errorf("failed to exec goroutine: %w", err)
}
return nil
}
semaphore
という変数に3つまでしか値が詰められないチャネルをセット。
これにより、forループ内で semaphore <- struct{}{}
というように値を詰めても、4つ目以降をつめようとするところで止まる。(つまり後続の errGrp.Go(func() error {
に到達しなくなる。)
で、semaphore
は errGrp.Go(~~)
の中で実行する関数(sr.exec(~, semaphore)
)に渡しており、関数内で処理を終えたら詰めた値を1つ除くという処理(<-semaphore
)をしている。
errgroup
は forループ外で生成後、forループ内で errGrp.Go(func() error { ~~~~ })
という形式で goroutine 起動している。
goroutine 起動すると、それだけでは main.go から始まった処理を止めることはないので、goroutine 内の処理が継続していようと main.go の処理は進み、その処理が終われば、アプリ自体は停止する。
これを止める(つまり、起動した goroutine 内の処理がすべて終わるまでメインの処理も止めないように制御する)のが errGrp.Wait()
での待機部分。
これがあると、errGrp
という変数で起動した全ての goroutine の処理が終わる(★もしくは error を返す)まではメインの処理をここで止めておくことができる。
残るは以下の部分。
select {
case <-eCtx.Done():
return xerrors.Errorf("canceled: %#v", o)
default:
return sr.exec(eCtx, o, semaphore)
}
これは、通常は sr.exec(~~)
によってそれぞれの goroutine の処理を実行するけど、その中で error が返されたら、eCtx
という名のコンテキストはもう終了扱いになり、後続の goroutine を生成しないよう errgroup がハンドリングしてくれる。
この部分の挙動については後述の『1 goroutine でエラー発生後、後続の goroutine 実行を止める』の説明で。
service.go(subroutine#exec())
続いて、1 goroutine での処理ロジックの部分。
まあ、コメントの通り。
type subroutine struct {
orderAPIClient client.OrderAPIClient
dbClient client.DBClient
mailClient client.MailClient
}
func (sr subroutine) exec(eCtx context.Context, order client.Order, semaphore chan struct{}) error {
defer func() {
<-semaphore // 処理後にチャネルから値を抜き出さないと、次の goroutine が起動できない
}()
// 「注文」情報リクエスト中であることを記録
if err := sr.dbClient.WriteRequesting(eCtx, order); err != nil {
return xerrors.Errorf("failed to write order requesting: %w", err)
}
// 外部APIを使って「注文」リクエストを飛ばす
if err := sr.orderAPIClient.Request(eCtx, order); err != nil {
return xerrors.Errorf("failed to request order: %w", err)
}
// 「注文」結果をDBに保存
if err := sr.dbClient.SaveOrderRequestStatus(eCtx, order); err != nil {
return xerrors.Errorf("failed to save order status: %w", err)
}
// 「注文」結果をメール送信
if err := sr.mailClient.Send(eCtx, order); err != nil {
return xerrors.Errorf("failed to send mail: %w", err)
}
return nil
}
上記処理の中は、すべてダミー実装なので特に重要ではない。
唯一、後述の解説のため、以下だけ載せておく。
order.go
type orderAPIClient struct {
}
// MEMO: 「注文」情報を外部APIに渡す機能(平均 10 秒くらい時間がかかる想定)のダミー実装
func (c *orderAPIClient) Request(ctx context.Context, o Order) error {
switch o.ID {
case "id001":
time.Sleep(9 * time.Second)
fmt.Println("[Request] id001")
case "id002":
time.Sleep(12 * time.Second)
fmt.Println("[Request] id002")
case "id003":
//return xerrors.New("failed to Request")
time.Sleep(10 * time.Second)
fmt.Println("[Request] id003")
case "id004":
time.Sleep(8 * time.Second)
fmt.Println("[Request] id004")
case "id005":
time.Sleep(11 * time.Second)
fmt.Println("[Request] id005")
}
return nil
}
適当にIDごとに異なる処理時間となるようスリープをかけている。
1 goroutine でエラー発生後、後続の goroutine 実行を止める
ソース解説
今回は先にソース解説を。基本的にソースは『並行起動した全 goroutine の終了を待ち受ける』で説明したものと同じ。
唯一、違う部分だけ抜粋。
// MEMO: 「注文」情報を外部APIに渡す機能(平均 10 秒くらい時間がかかる想定)のダミー実装
func (c *orderAPIClient) Request(ctx context.Context, o Order) error {
switch o.ID {
〜〜〜〜
case "id003":
return xerrors.New("failed to Request")
case "id004":
〜〜〜〜
return nil
}
id003
のときだけは、エラーを返すようにした。
挙動(ログ)
START: 2020-06-07T01:06:09+09:00
loop-index: 0 [ID:id001][Name:注文1]
loop-index: 1 [ID:id002][Name:注文2]
loop-index: 2 [ID:id003][Name:注文3]
[Write ] id002
[Write ] id001
[Write ] id003
loop-index: 3 [ID:id004][Name:注文4]
[Request] id001
[Save ] id001
[Request] id002
[Save ] id002
[Send ] id001
loop-index: 4 [ID:id005][Name:注文5]
[Send ] id002
failed to exec service: failed to exec goroutine:
github.com/sky0621/tips-go/try/errgroup.(*service).Exec
/home/sky0621/work/src/github.com/sky0621/tips-go/try/errgroup/service.go:64
- failed to request order:
github.com/sky0621/tips-go/try/errgroup.subroutine.exec
/home/sky0621/work/src/github.com/sky0621/tips-go/try/errgroup/service.go:89
- failed to Request:
github.com/sky0621/tips-go/try/errgroup/client.(*orderAPIClient).Request
/home/sky0621/work/src/github.com/sky0621/tips-go/try/errgroup/client/order.go:32
Process finished with exit code 0
同じタイミングで同時並行起動した id001
と id002
の方は最後の処理まで終了したものの、その後に処理されるはずだった id004
と id005
は処理されずに終わっている。
そう、そうなんだけど、これ、実は単にそうなるよう仕向けただけ。
以下のような実装であれば、結局、後続の goroutine での処理もすべて処理されてしまうんだよね。
再度ソース修正
// MEMO: 「注文」情報を外部APIに渡す機能(平均 10 秒くらい時間がかかる想定)のダミー実装
func (c *orderAPIClient) Request(ctx context.Context, o Order) error {
switch o.ID {
〜〜〜〜
case "id003":
time.Sleep(15 * time.Second)
return xerrors.New("failed to Request")
case "id004":
〜〜〜〜
return nil
}
エラーが起きる処理が、ひたすら時間がかかった挙句に(タイムアウト等で?)エラーを返した場合、その頃には既に後続の goroutine も起動されちゃっているので、結局、処理対象すべて起動した挙句に、id003
の分だけエラーだったねとなる。(まあ、forループで「注文」を上から順に1つずつ処理しているわけではないので当然だけど。)
再度ソース修正後の挙動(ログ)
START: 2020-06-07T01:09:19+09:00
loop-index: 0 [ID:id001][Name:注文1]
loop-index: 1 [ID:id002][Name:注文2]
loop-index: 2 [ID:id003][Name:注文3]
[Write ] id002
[Write ] id001
[Write ] id003
[Request] id001
[Save ] id001
[Request] id002
[Save ] id002
[Send ] id001
loop-index: 3 [ID:id004][Name:注文4]
[Write ] id004
[Send ] id002
loop-index: 4 [ID:id005][Name:注文5]
[Write ] id005
[Request] id004
[Save ] id004
[Send ] id004
[Request] id005
[Save ] id005
[Send ] id005
failed to exec service: failed to exec goroutine:
github.com/sky0621/tips-go/try/errgroup.(*service).Exec
/home/sky0621/work/src/github.com/sky0621/tips-go/try/errgroup/service.go:64
- failed to request order:
github.com/sky0621/tips-go/try/errgroup.subroutine.exec
/home/sky0621/work/src/github.com/sky0621/tips-go/try/errgroup/service.go:89
- failed to Request:
github.com/sky0621/tips-go/try/errgroup/client.(*orderAPIClient).Request
/home/sky0621/work/src/github.com/sky0621/tips-go/try/errgroup/client/order.go:33
Process finished with exit code 0
id003
はエラーが起きたから処理が止まっているけど、後続のはずだったid004
とid005
の処理は(id003
のエラー発生の影響を受けることなく)正常に終わっている。
まとめ
実際には、1 goroutine でエラーが起きたからといって、必ずしも後続の goroutine を止めるべきとは限らないので、1 goroutine 内で起きたエラーの内容によってエラーを返すか否か処理を分けることになると思う。
例えば、RESTのAPIコールしていて、500系のエラーステータスを返されたら、おそらくはこの goroutine だけでなく他の goroutine もAPIコール失敗するような事態が(APIコール先の方で)起きているんだろうと判断できる(かもしれない)。
それが、400系のエラーステータスであれば、きっとこの goroutine で渡したリクエストの不備だろうから、他の goroutine を止めるのは、やり過ぎだろう。
ともかく、goroutine を要する実装をしようと思うと、いろいろ考えることが増える・・・。