1. はじめに
Goのゴルーチン(Goroutine)はだいたいわかったはずなのに、doneチャネルパターンがよくわからない……。(´・ω・`)
数学の問題の解説文がわからなかったときの感覚と似ている……。
こういう時は確か、前提知識となる部分がどこかで抜け落ちているから、理解を阻む壁にぶつかっているのだと思う。
φ(・ω・ )フムフム……(参考書の読み直し中)
あぁ……、そっかぁ~~
(;´д`)=3
となったので、その過程や、どうdoneチャネルパターンを理解したかを忘備録がてら共有してみようと思い、この記事を書きました。
目次
2. doneチャネルパターンとは
doneチャネルパターンとは、ゴルーチンリークを防ぐために実装されるパターンです。
ガベージコレクションの対象になる変数と違ってGoは全く使われていないゴルーチンを感知できず、ゴルーチンが終了しない場合スケジューラは定期的に未完了のゴルーチンにたいして時間を割り当てます。ゴルーチンが適切に終了しないために空白の時間が生じ、これが全体の動作を遅くすることをゴルーチンリークと言います。
この事態を防ぐために、もうそのゴルーチンを使わなくなった場合に終了シグナルを送ることで、確実にゴルーチンを停止させることができます。
このようなゴルーチンを停止させるシグナルを実装するパターンが、doneチャネルパターンです。
ゴルーチンリークはどんな時に起きるのか、その例をお示ししようと思います。
まず、普通にゴルーチンが終わってくれるケースです。
以下のコードでは、チャネルの挙動を確認するために意図的に処理を中断させたり(time.Sleep関数)値を出力させたり(fmt.Println関数)するようにしています。
package main
import (
"fmt"
"time"
)
func looper(max int) <- chan int {
ch := make(chan int)
go func(){
for i := 0; i < max; i++{
fmt.Println("Running looper.")
ch <- i
}
fmt.Println("Finish",time.Now())
close(ch)
}()
return ch
}
func parentalFunc(){
max := 10
for v := range looper(max){
time.Sleep(1 * time.Second)
fmt.Println("Running for range loop.")
fmt.Println(v)
}
}
func main(){
parentalFunc()
}
【チャネルの挙動】
チャネルはオープンな状態でバッファリングされていなければ、書き込みはチャネルが空の時に可能で、読み込みはチャネルが何らかの値を持っているときに可能で、その他の場合はブロック(関数処理のポーズ)します。
looper関数の中ではfor文でチャネルに書き込んでおり、looper(max)の返り値をfor range文で読み取っているので、この「処理の中断」という話に慣れていないと、あたかも一気に書き込んで一気に読み込んでいるように見えるかもしれません。
しかし、実際はまずlooper関数のfor文の中でi=0をchに書き込み、ループしているので続いてi=1を書き込もうとしますが、parentalFunc関数の方でまだチャネルの読み込み処理が進んでいないので、ブロックされます。parentalFunc関数にて一秒後にチャネルが読み込まれると、即座にlooper関数のforループの中断が解かれ、チャネルにi=1が書き込まれます。またループして書き込もうとしても、すぐには読みだされないからブロックされて……という形で進んでいきます。
実際、実行結果は以下のようになります。
Running looper.
Running looper.
Running for range loop.
0
Running looper.
Running for range loop.
1
Running looper.
Running for range loop.
2
最初に"Running looper."が二連続しているのが、先ほど説明した通りなので想定通りの挙動ですね。
大事なのは、「チャネルの読み込みや書き込みができるかどうかは、チャネルの状態による」ということです。
doneチャネルパターンにとっては、「書き込まれていないチャネルを読み込もうとするとブロックされ、読み込みは成功しない」という事実が大事でしょう。
【close関数の挙動】
チャネルをクローズすると、それ以上チャネルに対して書き込みができなくなります。
実際、looper関数の中でcloseした後に ch <- 5 などとするとパニックになります。
面白いのは、closeされた後でも値の読み込み自体は常に成功するということです。もうチャネルに値がない場合でも読み込もうとしたときは、初期値が返ってきます。
チャネルのクローズは、チャネルがクローズするのを待っているゴルーチンがあるときのみ必須です。今回の場合で言えば、parentalFuncにおけるfor rangeループのことです。
for rangeループは通常、スライスやマップなどの各要素を一つずつ取得して処理するのですが、チャネルは変数としては一つです。この場合、チャネルがクローズされるまで、あるいはbreak文やreturn文に出会うまでループします。
ここで大切なのは、「closeされたチャネルにおいては、読み込み処理が常に成功する」という事実です。これがdoneチャネルパターンの肝になってきます。
次に、ゴルーチンリークが起きるときの状況をお示しします。
pakcage main
import (
"fmt"
"time"
)
func looper(max int) <- chan int {
ch := make(chan int)
go func(){
for i := 0; i < max; i++{
ch <- i
}
close(ch)
}()
return ch
}
func leakGoroutine(){
max := 10
for v := range looper(max){
time.Sleep(1 * time.Second)
fmt.Println(v)
if v > 5{
break
}
}
func doHeavyTasks()
}
func main(){
leakGoroutine()
}
looper関数からの値が5より大きくなった時、leakGoroutine関数におけるforループは終了します。
しかし、looper関数においてはi=6がleakGoroutine関数で読み込まれないために詰まってしまい、i=7がチャネルに書き込めずにブロックしてしまいます。
looper関数のゴルーチンの無名関数はちゃんと終了できていないため、定期的にこのゴルーチンに時間が割り当てられます。何もやることがないのに、です。
これがゴルーチンリークの具体的な状況です。
お待たせしました。
ようやく、doneチャネルパターンです。
package main
import (
"fmt"
"time"
)
func looper(max int, done <- chan struct{}) <- chan int {
ch := make(chan int)
go func(){
for i := 0; i < max; i++{
select{
case <- done:
fmt.Println("Received done signal.")
return
case ch <- i:
fmt.Println("Running looper.")
}
}
}()
return ch
}
func LeakGoroutine(){
done := make(chan struct{})
max := 10
for v := range looper(max, done){
fmt.Println(v)
if v > 5{
break
}
}
close(done)
func doHeavyTasks()
}
func main(){
LeakGoroutine()
}
【selectの挙動】
select文は、switch文と似ています。
select文がswitch文と違うのは、caseを上から評価するのではなく、チャネルの読み書きの準備が整っているかどうかを評価して、準備が整っているcaseに関してランダムに実行するという点です。
case <- done:
の部分が、doneチャネルパターンの肝です。
この部分は、doneチャネルの読み込みのみを行っており、doneチャネルから取得できた値は無視するという意味のコードです。ただし、LeakGoroutine()関数の中ではdoneチャネルにたいして書き込み処理を行ってはいません。【チャネルの挙動】で言ったように、チャネルは書き込まれているものしか読み込むことはできないわけですから、このcaseはまずもって実行されないことになります。
ただし、【closeの挙動】で言ったように、チャネルがcloseされるとその読み取りは常に成功します。
このため、LeakGoroutine関数にてdoneチャネルをクローズすればlooper関数の中でcase <- done:
が成功し、return されることによって、looper関数内のゴルーチンの無名関数は無事に終了することになります。
ちょっとした応用というか、ここまでの話が分かっていれば、「doneチャネルをクローズしなくても、何らかの値を書き込んであげればいいんじゃないの?」という話もさほど苦労せずに理解できると思います。
func LeakGoroutine(){
done := make(chan struct{})
max := 10
for v := range looper(max, done){
fmt.Println(v)
if v > 5{
break
}
}
done <- struct{}{}
func doHeavyTasks()
}
上の場合でも、doneチャネルが読み込めるようになり、looper関数のselect文での、case <- done:
が成功することによりreturn文が実行され、無事に無名関数が終了します。
この違いは、closeする方が終了シグナルを送るという意図が明確になる点、バッファがないのでdoneへの書き込みを必ず一度きりすることに気を払わないといけない点などがあげられるので、特にこだわりがなければcloseすることをお勧めします。
3. selectの応用
selectはdoneチャネルパターンにも用いることができますし、タイムアウトを実装したりもできます。
doneチャネルを変形して関数の呼び出し側がdoneチャネルを管理しなくていいようにしたり、チャネルの読み込み時特有のイディオムを使ってcaseを無効化したりなどもできます。
ちょっとしたtipsですが、折角なので紹介しておこうと思います。
3-1. キャンセレーション関数
2章の例では、LeakGoroutine関数のように、子ゴルーチン関数を呼び出す側、親の側の関数がdoneチャネルを作って子ゴルーチンに渡し、doneチャネルをcloseしていました。
しかし、子ゴルーチンがdoneチャネルをつくり、それをラップしたcancel関数を返してあげることで、呼び出す側ではdoneチャネルパターンを意識せずに関数の確実な終了を実装することができます。
package main
import (
"fmt"
"time"
)
func looper(max int) (<- chan int, func() ){
ch := make(chan int)
done := make(chan struct{})
cancelFunc := func(){
close(done)
}
go func(){
for i := 0; i < max; i++{
select{
case <- done:
fmt.Println("Received done signal.")
return
case ch <- i:
fmt.Println("Running looper.")
}
}
}()
return ch, cancelFunc
}
func OrdinaryGoroutine(){
max := 10
res, cancelFunc := looper(max)
defer cancelFunc()
for v := range res{
fmt.Println(v)
if v > 5{
break
}
}
func doHeavyTasks()
}
func main(){
OrdinaryGoroutine()
}
cancelFunc関数はlooperのクロージャであり、即ちcancelFuncという変数に代入されてある無名関数です。
クロージャ内からはlooperで宣言されていたdoneチャネルを操作することが可能で、cancelFuncを実行することによってゴルーチンにたいして終了シグナルを送ることができます。
3-2. タイムアウト
タイムアウトの実装は、doneチャネルパターンやキャンセレーション関数を実装したselect文のcaseを増やして、ある一定の時間が立ったら処理が終わるようにコードを付け加えるだけで大丈夫です。
例えば、以下のコードがそうです。
package main
import (
"fmt"
"math/rand"
"time"
)
func doTask() (int, error) {
var sleepTime int = rand.Intn(5)
time.Sleep(time.Duration(sleepTime) * time.Second)
return sleepTime, nil
}
func Limit(){
done := make(chan struct{})
var result int
var err error
go func(){
result, err = doTask()
close(done)
}()
select{
case <- done:
fmt.Println(result, err)
return
case <- time.After(2*time.Second):
fmt.Println("time out")
return
}
}
func main(){
Limit()
}
doTask関数は、sleepの時間が4秒以内のランダムな時間になります。
doTaskを呼び出す側は、doneチャネルと時間制限の両方を監視することで、2秒経ったならば case <- time.After(2*time.Second):
が実行され、処理が終わるようになります。
time.After関数の返り値はチャネルであるので、select文のcase句に使うことができるわけですね。
3-3. caseの無効化
closeしたチャネルから読み込んだ時、カンマokイディオムによって、その読み込んだ値が閉じたチャネルからなのか開いたチャネルからなのかを判別できます。
case句でのチャネルの読み込み処理自体は、チャネルが閉じていようが開いていようが常に成功してしまうので、同じ処理が重複してしまいます。
これを回避するためには、もし閉じたチャネルから読み込んだ場合は、チャネルにnilを代入することにより、そのcaseを無効にすることができます。
nilチャネルからの読み込みは値を返さないためです。
package main
import (
"time"
"fmt"
)
func doHeavyTask(waitTime int, ch chan int){
time.Sleep(time.Duration(waitTime) * time.Second)
ch <- 10
}
func IneffectiveCase(){
ch1 := make(chan int)
ch2 := make(chan int)
go func(){
doHeavyTask(4, ch1)
close(ch1)
}()
go func(){
doHeavyTask(3, ch2)
close(ch2)
}()
for{
select{
case v, ok := <- ch1:
fmt.Println("ch1", v, ok)
if !ok {
ch1 = nil
continue
}
case v,ok := <- ch2:
fmt.Println("ch2", v, ok)
if !ok {
ch2 = nil
continue
}
case <- time.After(6 * time.Second):
fmt.Println("time out")
return
}
if ch1 == nil && ch2 == nil{
return
}
}
}
これにより、一旦nilが代入されたcaseは再び実行されることはなく、終了させることができます。
上のコードの場合は、go <無名関数>のなかでチャネルにnilを代入させることもできますが、イディオム的ではないので、特に拘りがなければcase 句以下の処理の中でnilを代入することをお勧めします。
4. おわりに
今回は紹介できませんでしたが、ここで触れられていなかったゴルーチンのトピックとして、バッファリングされたチャネル、sync.WaitGroupメソッドによる複数ゴルーチンの実行と待ち合わせなどがあります。
このトピックに関して言えば、以下の記事が大変参考になります。ぜひご参照ください。
また、今回実装したdoneチャネルパターンやキャンセレーション関数、タイムアウトなどはGoの「context」を用いても実行できます。この詳しい内容は別の方が詳しく解説されてありますので、ぜひご参照ください。
contextのValueメソッド以外は、だいたいこの記事で紹介されてあることが理解できていればすっと頭に入ってくると思います。
5. 参考
非常に参考にさせていただきました。