#ゴルーチンで再帰関数を並列処理する
フィボナッチ数列など再帰関数をゴルーチンを使って並列処理について調査したときの備忘録。
ただし、パフォーマンスの観点は考慮していない。
##フィボナッチ数列
まずはGo
でフィボナッチ数列のサンプル。
負の数については今回は対象外とした。
package main
import (
"fmt"
"os"
)
func fibonacci(n int64) int64 {
switch {
case n == 0:
return 0
case n == 1:
return 1
case n < 0:
fmt.Println("Parameter should be positive integers")
os.Exit(1)
}
return fibonacci(n - 2) + fibonacci(n - 1)
}
func main() {
fmt.Println(fibonacci(20))
}
上記のサンプルの結果はこちら。
$ go run main.go
6765
##ゴルーチンを使って再帰関数を並行処理
フィボナッチ数列のサンプルをゴルーチンで並行処理をする。
値はチャンネル経由で受信するようにした。
package main
import (
"fmt"
"os"
)
func fibonacci(n int64) int64 {
switch {
case n == 0:
return 0
case n == 1:
return 1
case n < 0:
fmt.Println("Parameter should be positive integers")
os.Exit(1)
}
ch1 := make(chan int64)
go func() {
defer close(ch1)
ch1 <- fibonacci(n-2)
}()
ch2 := make(chan int64)
go func() {
defer close(ch2)
ch2 <- fibonacci(n-1)
}()
return <-ch1 + <-ch2
}
func main() {
ch := make(chan int64)
go func() {
defer close(ch)
ch <- fibonacci(20)
}()
fmt.Println(<-ch)
}
同じ結果が出力された。
$ go run main.go
6765
##再帰関数の並行処理時のゴルーチンの同時並行最大数
###WaitGroup
を使ったカウンター
まずは正確にカウントできるかどうかサンプルで確認する。
ひたすらインクリメントするゴルーチンに対してWaitGroup
を使った集計方法を導入したサンプル。
package main
import (
"fmt"
"sync"
)
type MyWaitGroup struct {
Wg sync.WaitGroup
Count int64
Ch1 chan int64
Ch2 chan int64
}
func (g *MyWaitGroup) Add() {
g.Wg.Add(1)
g.Count++
}
func (g *MyWaitGroup) Done() {
g.Wg.Done()
}
func (g *MyWaitGroup) Wait() {
g.Wg.Wait()
}
func main() {
counter := 0
g := &MyWaitGroup{ Count:0 }
for i := 0; i < 1000; i++ {
g.Add()
go func() {
defer g.Done()
counter++
}()
}
g.Wait()
fmt.Printf("%d, %d\n", counter, g.Count)
}
結果は以下。
ひたすらインクリメントするゴルーチンは計算処理中にmain
が終了するため、期待するカウント値は取得できないのに対してWaitGroup
を使った集計では取得ができていることを確認。
$ go run main.go
951, 1000
###ゴルーチンの同時並行最大数を集計
先程のサンプルを拡張し、フィボナッチ数列のサンプルを並行処理をしたときのゴルーチンの同時並行最大数を集計。
package main
import (
"fmt"
"os"
"sync"
)
type MyWaitGroup struct {
Wg sync.WaitGroup
Count int64
MaxCount int64
}
func (g *MyWaitGroup) Add() {
g.Wg.Add(1)
g.Count++
if g.MaxCount < g.Count { g.MaxCount = g.Count }
}
func (g *MyWaitGroup) Done() {
g.Wg.Done()
g.Count--
}
func (g *MyWaitGroup) Wait() {
g.Wg.Wait()
fmt.Printf("Max:%d\n", g.MaxCount)
}
func (g *MyWaitGroup) fibonacci(n int64) int64 {
switch {
case n == 0:
return 0
case n == 1:
return 1
case n < 0:
fmt.Println("Parameter should be positive integers")
os.Exit(1)
}
ch1 := make(chan int64)
g.Add()
go func() {
defer close(ch1)
defer g.Done()
ch1 <- g.fibonacci(n-2)
}()
ch2 := make(chan int64)
g.Add()
go func() {
defer close(ch2)
defer g.Done()
ch2 <- g.fibonacci(n-1)
}()
return <-ch1 + <-ch2
}
func main() {
g := &MyWaitGroup{ Count:0, MaxCount:0 }
ch := make(chan int64)
g.Add()
go func() {
defer close(ch)
defer g.Done()
ch <- g.fibonacci(20)
}()
fmt.Println(<-ch)
g.Wait()
}
結果はこちら。
実行するごとに最大値は異なるが、結構な数のゴルーチンが生成されていることがわかる。
$ go run main.go
6765
Max:4729
##ゴルーチンのトータル発生数について
###計算方法
sample3.go
におけるゴルーチンのトータル発生数を計算する。
n
が0
と1
の時はゴルーチンのトータル発生数は1
、2
のときは3
となる。
n
が3
のときは5
、4
のときは9
、5
のときは15
・・・となるので、ゴルーチンのトータル発生数は以下の漸化式となる。
f_c(0)=f_c(1)=1\\\
f_c(n)=f_c(n-2)+f_c(n-1)+1
再び再帰関数が生まれたので、サンプルを作成。
package main
import (
"fmt"
"os"
)
func fc(n int64) int64 {
switch {
case n == 0:
return 1
case n == 1:
return 1
case n < 0:
fmt.Println("Parameter should be positive integers")
os.Exit(1)
}
return fc(n - 2) + fc(n - 1) + 1
}
func main() {
fmt.Println(fc(20))
}
出力結果は以下となり、21891
のゴルーチンが累計で発生することになる。
そのため、sample3.go
の場合、sync.Mutex
等でゴルーチンの同時並行処理数を21891
以下にするとデッドロックが発生する可能性がある。
$ go run main.go
21891
###(補足)再帰関数からループに変更した場合
sample1.go
を以下のように再帰関数を使わずにループ処理に変更すると直前のループ処理の結果が必ず必要になるため、ゴルーチンを使っても並行処理をする場合、ループを分けるなどの工夫が必要となる。
package main
import (
"fmt"
)
func main() {
pv := [2]int64{1, 1}
var fib int64
for i := 2; i < 10; i++ {
fib = pv[0] + pv[1]
pv[0] = pv[1]
pv[1] = fib
}
fmt.Println(fib)
}
##できるだけゴルーチンの最大並行実行数を制限する
チャンネルのcapacity
を制限させることで(参考)、最大並行実行数を制限する。
fatal error: all goroutines are asleep - deadlock!
が発生しないようにすることが前提。
以下のようにサンプルを変更する。チャンネルのcapacity
をデフォルト(0
)にした。
package main
import (
"fmt"
"os"
"sync"
)
type MyWaitGroup struct {
Wg sync.WaitGroup
Count int64
MaxCount int64
Ch chan int64
}
func (g *MyWaitGroup) Add() {
g.Wg.Add(1)
g.Count++
if g.MaxCount < g.Count { g.MaxCount = g.Count }
}
func (g *MyWaitGroup) Done() {
g.Wg.Done()
g.Count--
}
func (g *MyWaitGroup) Wait(cs chan int64) {
g.Wg.Wait()
close(g.Ch)
cs <- g.MaxCount
}
func(g *MyWaitGroup) fibonacci(n int64) {
switch {
case n == 0:
break
case n == 1:
g.Ch <- 1
case n < 0:
fmt.Println("Parameter should be positive integers")
os.Exit(1)
default:
g.Add()
go func() {
defer g.Done()
g.fibonacci(n - 2)
}()
g.Add()
go func() {
defer g.Done()
g.fibonacci(n - 1)
}()
}
}
func printResult(cs <-chan int64, done chan<- bool) {
var sum int64
for s := range cs {
sum += s
}
fmt.Println(sum)
done <- true
}
func printMax(cs <-chan int64, done chan <- bool) {
fmt.Printf("Max:%d\n", <-cs)
done <- true
}
func main() {
g := &MyWaitGroup{ Count:0, MaxCount:0, Ch:make(chan int64) }
g.Add()
go func() {
defer func() {
g.Done()
}()
g.fibonacci(20)
}()
done1 := make(chan int64)
go g.Wait(done1)
done2 := make(chan bool)
go printResult(g.Ch, done2)
<-done2
done3 := make(chan bool)
go printMax(done1, done3)
<-done3
}
出力結果は以下のようにゴルーチンの同時並行最大数を制限できていることを確認。
$ go run main.go
6765
Max:1792
sample3.go
よりもsample5.go
の方が同時並行実行されるゴルーチンの最大値よりも少なくなる可能性が高くなる。
フィボナッチ数列の計算量が増えれば増えるほど(サンプルでは20
だが100
等に増やした場合)、その可能性は高くなっていく。
##(補足)ゴルーチンの処理が完了しないケース
###sync.WaitGroup
を使った同期
以下のようなサンプルの場合、ゴルーチンの処理が完了する前にmain
が終了するため、正確なカウントができない。
package main
import (
"fmt"
)
func main() {
counter := 0
for i := 0; i <= 100; i++ {
go func(t int) {
counter += i
} (i)
}
fmt.Printf("%d\n", counter)
}
$ go run main.go
4794
sync.WaitGroup
でワーカーを用意し、同期させることでゴルーチンを正確にカウントする(計算を完了させる)ことができる。
package main
import (
"sync"
"fmt"
)
func main() {
var wg sync.WaitGroup
ch := make(chan int, 10)
done := make(chan bool, 1)
for i := 0; i <= 100; i++ {
wg.Add(1)
go func(t int) {
defer wg.Done()
ch <- t
}(i)
}
s := 0
go func () {
for {
select {
case i := <- ch:
s += i
case <- done:
return
}
}
}()
wg.Wait()
fmt.Println(s)
}
$ go run main.go
5050
###その他
以下の方法でもゴルーチンを正確を正確にカウントする(計算を完了させる)ことができる。
一旦数値をチャンネルに渡す、すべてのゴルーチンを起動した直後に、Wait
しているので、WaitGroup
カウンターが0
になるのを待ち、チャンネルをクローズするゴルーチンを起動している。そうすることで、range
でチャンネルの受信待ちをしている箇所でgoroutine 1 [chan receive]
といったデッドロックのエラーを回避している。
なお、チャンネルのキャパシティをデフォルトの0
ではなく、make(chan int, 100)
と100
等にすると処理速度の向上が期待できる。
package main
import (
"sync"
"fmt"
)
func main() {
var wg sync.WaitGroup
ch := make(chan int)
for i := 0; i <= 100; i++ {
wg.Add(1)
go func(t int) {
defer wg.Done()
ch <- t
}(i)
}
go func() {
wg.Wait()
close(ch)
}()
sum := 0
for v := range ch {
sum += v
}
fmt.Println(sum)
}
$ go run main.go
5050
以下のように、チャンネルのキャパシティを10
などにして、先にWaitGroup
カウンターが0になるのを待ち、チャンネルをクローズするゴルーチンを起動すると、数値をチャンネルに渡すゴルーチンの処理状況においては、すべてのゴルーチンが起動する前にWaitGroup
カウンターが0
になる場合があるので、panic: send on closed channel
といったエラーが発生する。
package main
import (
"sync"
"fmt"
"time"
)
func main() {
var wg sync.WaitGroup
ch := make(chan int, 10)
go func() {
wg.Wait()
close(ch)
}()
for i := 0; i <= 10; i++ {
wg.Add(1)
go func(t int) {
defer wg.Done()
ch <- t
}(i)
time.Sleep(time.Second)
}
sum := 0
for v := range ch {
sum += v
}
fmt.Println(sum)
}
$ go run main.go
panic: send on closed channel
goroutine 17 [running]:
main.main.func2(0xc420014090, 0xc420086000, 0x1)
/Users/hiroyuki/Documents/work/qiita/golang/fibonacci/main.go:22 +0x65
created by main.main
/Users/hiroyuki/Documents/work/qiita/golang/fibonacci/main.go:20 +0xdb
exit status 2