強化月間なので、今回もGoの記事を上げていきます。
はじめに
アプリケーションで長時間の処理を実行する際、以下のような処理を挟むことがあると思います。
- メイン処理途中に同時並行で定期的に流す処理(例:処理中ログの表示)
- メイン処理完了時に流す処理(例:完了ログの表示)
定期実行処理はメインの処理の裏で同時に流したいので別のゴルーチン内に実装しますが、気を付けないと完了時処理より後に定期実行処理が流れてしまいます。ハマるとたまにしか再現せず厄介なので、備忘録としてまとめました。
定期実行処理でハマったこと
以下の3つの処理を流すことを考えます。
- メインとなる処理(長時間かかる)
- メイン処理中に流す定期実行処理(処理中ログの表示)
- メイン処理完了時処理(完了ログの表示)
定期実行処理はメイン処理をブロックしてほしくないのでゴルーチンに追い出します。
func longProcess() {
// 実際には何か長い処理が走る
time.Sleep(5 * time.Second)
}
func Process() {
ticker := time.NewTicker(1 * time.Second)
done := make(chan struct{})
// バックグラウンドで定期実行処理
// tickerで1秒おきに実行
go func() {
for {
select {
case <-ticker.C:
time.Sleep(100 * time.Millisecond)
log.Println("processing now...")
// doneを受け取ったら定期実行終了
case <-done:
return
}
}
}()
longProcess()
// メイン処理が終わったら定期実行処理ループを終了させる
close(done)
}
完了時処理はメイン処理でエラーやpanicが起きた場合も流したいので、defer
に書いておきましょう。
func Process() {
ticker := time.NewTicker(1 * time.Second)
done := make(chan struct{})
// 完了時処理
defer func() {
log.Println("process finished")
}()
go func() {
for {
select {
case <-ticker.C:
time.Sleep(100 * time.Millisecond)
log.Println("processing now...")
case <-done:
return
}
}
}()
longProcess()
close(done)
}
2009/11/10 23:00:01 processing now...
2009/11/10 23:00:02 processing now...
2009/11/10 23:00:03 processing now...
2009/11/10 23:00:04 processing now...
2009/11/10 23:00:05 processing now...
2009/11/10 23:00:05 process finished
Program exited.
一見実行結果もよさそうですが...
問題発生
2009/11/10 23:00:01 processing now...
2009/11/10 23:00:02 processing now...
2009/11/10 23:00:03 processing now...
2009/11/10 23:00:04 processing now...
2009/11/10 23:00:05 process finished
2009/11/10 23:00:05 processing now...
実は完了時処理と定期実行処理は並行処理になっていて、完了時処理の後に定期実行処理が流れてしまう可能性があります。
試しに定期実行処理にsleepを入れてみると、完了時処理の後で最後の定期実行処理が走ります。
func Process() {
// ...
go func() {
for {
select {
case <-ticker.C:
// sleepで定期実行処理の遅延を再現
time.Sleep(100 * time.Millisecond)
log.Println("processing now...")
// doneを受け取ったら定期実行終了
case <-done:
return
}
}
}()
// ...
}
func main() {
Process()
time.Sleep(1 * time.Second) // アプリ自体が終了しないように待機
}
「main
で処理後にsleepしなければアプリごと終了して問題ないのでは?」とお思いの方もいるかもしれませんが、webサーバー等処理後も起動し続けるアプリの場合は同じことが発生してしまいます。
実際の順序はそれぞれの関数の負荷次第でランダムなので、忘れたころにテストが落ちる(もしくは本番環境で謎の不整合 ) でしょう。
原因
ゴルーチンが終了する前にdeferが評価され得るのが原因です。
チャネルを close()
することでselect文の case <-done
が受け取れるようになります1。この際、close()
自体はすぐに完了しメインゴルーチンでは return
, そして defer
が開始されます。
一方、ゴルーチン内で現在のselect文の評価(=今回の定期実行処理)が打ち切られるわけではないため、定期実行処理と完了時処理が並行処理で流れてしまいます。
(定期実行処理のループを抜けるのは、現在のselect文の評価が終わり、次のループで done
チャネルから受信したタイミングです)
対策
定期実行処理が終わるまで完了時処理を開始しないようにします。3通りの書き方を思いつきました。
1. 定期実行処理終了の通知を待機する
ゴルーチンを2つ利用して、定期実行のゴルーチンに定期実行完了を返答させます。
メインゴルーチンは、返答があるまでブロックします。
func Process() {
ticker := time.NewTicker(1 * time.Second)
// これまで通り、メインゴルーチンから定期実行ゴルーチンへ完了を伝える
done := make(chan struct{})
// さらに、定期実行ゴルーチンが完了したことをメインゴルーチンへ返答する
doneReply := make(chan struct{})
// 完了時処理
defer func() {
log.Println("process finished")
}()
// バックグラウンドで定期実行処理
// tickerで1秒おきに実行
go func() {
for {
select {
case <-ticker.C:
time.Sleep(100 * time.Millisecond)
log.Println("processing now...")
// doneを受け取ったら定期実行終了
case <-done:
close(doneReply)
return
}
}
}()
longProcess()
// メイン処理が終わったら定期実行処理ループを終了させる
close(done)
// 定期実行が完了するまで待機
<-doneReply
}
チャネルの受け渡しがガチャガチャしているのが少し見づらいですね...
2. メイン処理をゴルーチンに入れる
メインゴルーチンと定期実行ゴルーチンを入れ替えます。
これにより、定期実行処理が完了するまでreturnせず(=deferが開始せず)、完了処理も開始しません。
func Process() {
ticker := time.NewTicker(1 * time.Second)
done := make(chan struct{})
// 完了時処理
defer func() {
log.Println("process finished")
}()
// メイン処理を go文の中へ
go func() {
longProcess()
// メイン処理が終わったら定期実行処理ループを終了させる
close(done)
}()
// 定期実行処理をメインゴルーチンへ記載
for {
select {
case <-ticker.C:
time.Sleep(100 * time.Millisecond)
log.Println("processing now...")
// doneを受け取ったら定期実行終了
case <-done:
return
}
}
}
チャネルは1つで済みますが、将来「なぜメイン処理をgo
に押し込めているの?」という疑問は沸きそうです。
3. doneチャネルをcloseする代わりにチャネルへ値を送る
最後にdoneチャネルをcloseする代わりに、定期実行ゴルーチンへ値を送る方法です。
close()
と違い 値の送信は相手が受け取るまでブロックされる ため、defer開始時にゴルーチン内が case <- done
のブロックに入っている(=定期実行完了している)ことが保証されます。
func Process() {
ticker := time.NewTicker(1 * time.Second)
done := make(chan struct{})
// 完了時処理
defer func() {
log.Println("process finished")
}()
// バックグラウンドで定期実行処理
// tickerで1秒おきに実行
go func() {
for {
select {
case <-ticker.C:
time.Sleep(100 * time.Millisecond)
log.Println("processing now...")
// doneを受け取ったら定期実行終了
case <-done:
return
}
}
}()
longProcess()
// closeではなく値を送ることで、相手(定期実行ゴルーチン)が受け取るまでブロックされる
// (ゴルーチンがdoneを受け取った時点で定期実行は完了している)
done <- struct{}{}
}
使い終わったチャネルをcloseしていないのが気持ち悪いですが、元コードとの差分は一番少ないです。
おわりに
以上、定期実行処理でハマったことと解決策の紹介でした。並行処理は気を抜くとバグりがちなので注意して実装したいですね...
正直1~3のどれも分かりやすさは一長一短(全部コメントが無いと意図が分からない)ので、もっと良い方法があればコメントいただけるとありがたいです!
-
閉じられたチャネルから値を受信すると即時ゼロ値が返ってきます https://go.dev/ref/spec#Close。 ↩