Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
10
Help us understand the problem. What is going on with this article?
@hiroykam

Golangでゴルーチンにより再帰関数を並列処理

More than 1 year has passed since last update.

ゴルーチンで再帰関数を並列処理する

フィボナッチ数列など再帰関数をゴルーチンを使って並列処理について調査したときの備忘録。
ただし、パフォーマンスの観点は考慮していない。

フィボナッチ数列

まずはGoでフィボナッチ数列のサンプル。
負の数については今回は対象外とした。

sample1.go
package main

import (
    "fmt"
    "os"
)

func fibonacci(n int64) int64 {
    switch {
    case n == 0:
        return 0
    case n == 1:
        return 1
    case n < 0:
        fmt.Println("Parameter should be positive integers")
        os.Exit(1)
    }
    return fibonacci(n - 2) + fibonacci(n - 1)
}

func main() {
    fmt.Println(fibonacci(20))
}

上記のサンプルの結果はこちら。

出力結果
$ go run main.go
6765

ゴルーチンを使って再帰関数を並行処理

フィボナッチ数列のサンプルをゴルーチンで並行処理をする。
値はチャンネル経由で受信するようにした。

sample2.go
package main

import (
    "fmt"
    "os"
)

func fibonacci(n int64) int64 {
    switch {
    case n == 0:
        return 0
    case n == 1:
        return 1
    case n < 0:
        fmt.Println("Parameter should be positive integers")
        os.Exit(1)
    }

    ch1 := make(chan int64)
    go func() {
        defer close(ch1)
        ch1 <- fibonacci(n-2)
    }()
    ch2 := make(chan int64)
    go func() {
        defer close(ch2)
        ch2 <- fibonacci(n-1)
    }()
    return <-ch1 + <-ch2
}

func main() {
    ch := make(chan int64)
    go func() {
        defer close(ch)
        ch <- fibonacci(20)
    }()

    fmt.Println(<-ch)
}

同じ結果が出力された。

出力結果
$ go run main.go
6765

再帰関数の並行処理時のゴルーチンの同時並行最大数

WaitGroupを使ったカウンター

まずは正確にカウントできるかどうかサンプルで確認する。
ひたすらインクリメントするゴルーチンに対してWaitGroupを使った集計方法を導入したサンプル。

counter.go
package main

import (
    "fmt"
    "sync"
)

type MyWaitGroup struct {
    Wg sync.WaitGroup
    Count int64
    Ch1 chan int64
    Ch2 chan int64
}

func (g *MyWaitGroup) Add() {
    g.Wg.Add(1)
    g.Count++
}

func (g *MyWaitGroup) Done() {
    g.Wg.Done()
}

func (g *MyWaitGroup) Wait() {
    g.Wg.Wait()
}

func main() {
    counter := 0
    g := &MyWaitGroup{ Count:0 }
    for i := 0; i < 1000; i++ {
        g.Add()
        go func() {
            defer g.Done()
            counter++
        }()
    }
    g.Wait()

    fmt.Printf("%d, %d\n", counter, g.Count)
}

結果は以下。
ひたすらインクリメントするゴルーチンは計算処理中にmainが終了するため、期待するカウント値は取得できないのに対してWaitGroupを使った集計では取得ができていることを確認。

結果
$ go run main.go
951, 1000

ゴルーチンの同時並行最大数を集計

先程のサンプルを拡張し、フィボナッチ数列のサンプルを並行処理をしたときのゴルーチンの同時並行最大数を集計。

sample3.go
package main

import (
    "fmt"
    "os"
    "sync"
)

type MyWaitGroup struct {
    Wg sync.WaitGroup
    Count int64
    MaxCount int64
}

func (g *MyWaitGroup) Add() {
    g.Wg.Add(1)
    g.Count++
    if g.MaxCount < g.Count { g.MaxCount = g.Count }
}

func (g *MyWaitGroup) Done() {
    g.Wg.Done()
    g.Count--
}

func (g *MyWaitGroup) Wait() {
    g.Wg.Wait()
    fmt.Printf("Max:%d\n", g.MaxCount)
}

func (g *MyWaitGroup) fibonacci(n int64) int64 {
    switch {
    case n == 0:
        return 0
    case n == 1:
        return 1
    case n < 0:
        fmt.Println("Parameter should be positive integers")
        os.Exit(1)
    }

    ch1 := make(chan int64)
    g.Add()
    go func() {
        defer close(ch1)
        defer g.Done()
        ch1 <- g.fibonacci(n-2)
    }()

    ch2 := make(chan int64)
    g.Add()
    go func() {
        defer close(ch2)
        defer g.Done()
        ch2 <- g.fibonacci(n-1)
    }()
    return <-ch1 + <-ch2
}

func main() {
    g := &MyWaitGroup{ Count:0, MaxCount:0 }
    ch := make(chan int64)
    g.Add()
    go func() {
        defer close(ch)
        defer g.Done()
        ch <- g.fibonacci(20)
    }()

    fmt.Println(<-ch)
    g.Wait()
}

結果はこちら。
実行するごとに最大値は異なるが、結構な数のゴルーチンが生成されていることがわかる。

結果
$ go run main.go
6765
Max:4729

ゴルーチンのトータル発生数について

計算方法

sample3.goにおけるゴルーチンのトータル発生数を計算する。
n01の時はゴルーチンのトータル発生数は12のときは3となる。
n3のときは54のときは95のときは15・・・となるので、ゴルーチンのトータル発生数は以下の漸化式となる。

f_c(0)=f_c(1)=1\\\
f_c(n)=f_c(n-2)+f_c(n-1)+1

再び再帰関数が生まれたので、サンプルを作成。

sample4.go
package main

import (
    "fmt"
    "os"
)

func fc(n int64) int64 {
    switch {
    case n == 0:
        return 1
    case n == 1:
        return 1
    case n < 0:
        fmt.Println("Parameter should be positive integers")
        os.Exit(1)
    }
    return fc(n - 2) + fc(n - 1) + 1
}

func main() {
    fmt.Println(fc(20))
}

出力結果は以下となり、21891のゴルーチンが累計で発生することになる。
そのため、sample3.goの場合、sync.Mutex等でゴルーチンの同時並行処理数を21891以下にするとデッドロックが発生する可能性がある。

出力結果
$ go run main.go
21891

(補足)再帰関数からループに変更した場合

sample1.goを以下のように再帰関数を使わずにループ処理に変更すると直前のループ処理の結果が必ず必要になるため、ゴルーチンを使っても並行処理をする場合、ループを分けるなどの工夫が必要となる。

sample1a.go
package main

import (
    "fmt"
)

func main() {
    pv := [2]int64{1, 1}
    var fib int64
    for i := 2; i < 10; i++ {
        fib = pv[0] + pv[1]
        pv[0] = pv[1]
        pv[1] = fib
    }
    fmt.Println(fib)
}

できるだけゴルーチンの最大並行実行数を制限する

チャンネルのcapacityを制限させることで(参考)、最大並行実行数を制限する。
fatal error: all goroutines are asleep - deadlock!が発生しないようにすることが前提。
以下のようにサンプルを変更する。チャンネルのcapacityをデフォルト(0)にした。

sample5.go
package main

import (
    "fmt"
    "os"
    "sync"
)

type MyWaitGroup struct {
    Wg sync.WaitGroup
    Count int64
    MaxCount int64
    Ch chan int64
}

func (g *MyWaitGroup) Add() {
    g.Wg.Add(1)
    g.Count++
    if g.MaxCount < g.Count { g.MaxCount = g.Count }
}

func (g *MyWaitGroup) Done() {
    g.Wg.Done()
    g.Count--
}

func (g *MyWaitGroup) Wait(cs chan int64) {
    g.Wg.Wait()
    close(g.Ch)
    cs <- g.MaxCount
}

func(g *MyWaitGroup) fibonacci(n int64) {
    switch {
    case n == 0:
        break
    case n == 1:
        g.Ch <- 1
    case n < 0:
        fmt.Println("Parameter should be positive integers")
        os.Exit(1)
    default:
        g.Add()
        go func() {
            defer g.Done()
            g.fibonacci(n - 2)
        }()
        g.Add()
        go func() {
            defer g.Done()
            g.fibonacci(n - 1)
        }()
    }
}

func printResult(cs <-chan int64, done chan<- bool) {
    var sum int64
    for s := range cs {
        sum += s
    }
    fmt.Println(sum)

    done <- true
}

func printMax(cs <-chan int64, done chan <- bool) {
    fmt.Printf("Max:%d\n", <-cs)
    done <- true
}

func main() {
    g := &MyWaitGroup{ Count:0, MaxCount:0, Ch:make(chan int64) }
    g.Add()
    go func() {
        defer func() {
            g.Done()
        }()
        g.fibonacci(20)
    }()

    done1 := make(chan int64)
    go g.Wait(done1)

    done2 := make(chan bool)
    go printResult(g.Ch, done2)
    <-done2

    done3 := make(chan bool)
    go printMax(done1, done3)
    <-done3
}

出力結果は以下のようにゴルーチンの同時並行最大数を制限できていることを確認。

出力結果
$ go run main.go
6765
Max:1792

sample3.goよりもsample5.goの方が同時並行実行されるゴルーチンの最大値よりも少なくなる可能性が高くなる。
フィボナッチ数列の計算量が増えれば増えるほど(サンプルでは20だが100等に増やした場合)、その可能性は高くなっていく。

(補足)ゴルーチンの処理が完了しないケース

sync.WaitGroupを使った同期

以下のようなサンプルの場合、ゴルーチンの処理が完了する前にmainが終了するため、正確なカウントができない。

calc1.go
package main

import (
    "fmt"
)

func main() {
    counter := 0
    for i := 0; i <= 100; i++ {
        go func(t int) {
            counter += i
        } (i)
    }
    fmt.Printf("%d\n", counter)
}
出力結果
$ go run main.go
4794

sync.WaitGroupでワーカーを用意し、同期させることでゴルーチンを正確にカウントする(計算を完了させる)ことができる。

calc2.go
package main

import (
    "sync"
    "fmt"
)

func main() {
    var wg sync.WaitGroup
    ch := make(chan int, 10)
    done := make(chan bool, 1)
    for i := 0; i <= 100; i++ {
        wg.Add(1)
        go func(t int) {
            defer wg.Done()
            ch <- t
        }(i)
    }
    s := 0
    go func () {
        for {
            select {
            case i := <- ch:
                s += i
            case <- done:
                return
            }
        }
    }()

    wg.Wait()
    fmt.Println(s)
}
出力結果
$ go run main.go
5050

その他

以下の方法でもゴルーチンを正確を正確にカウントする(計算を完了させる)ことができる。
一旦数値をチャンネルに渡す、すべてのゴルーチンを起動した直後に、Waitしているので、WaitGroupカウンターが0になるのを待ち、チャンネルをクローズするゴルーチンを起動している。そうすることで、rangeでチャンネルの受信待ちをしている箇所でgoroutine 1 [chan receive]といったデッドロックのエラーを回避している。
なお、チャンネルのキャパシティをデフォルトの0ではなく、make(chan int, 100)100等にすると処理速度の向上が期待できる。

calc3.go
package main

import (
    "sync"
    "fmt"
)

func main() {
    var wg sync.WaitGroup
    ch := make(chan int)
    for i := 0; i <= 100; i++ {
        wg.Add(1)
        go func(t int) {
            defer wg.Done()
            ch <- t
        }(i)
    }

    go func() {
        wg.Wait()
        close(ch)
    }()

    sum := 0
    for v := range ch {
        sum += v
    }
    fmt.Println(sum)
}
出力結果
$ go run main.go
5050

以下のように、チャンネルのキャパシティを10などにして、先にWaitGroupカウンターが0になるのを待ち、チャンネルをクローズするゴルーチンを起動すると、数値をチャンネルに渡すゴルーチンの処理状況においては、すべてのゴルーチンが起動する前にWaitGroupカウンターが0になる場合があるので、panic: send on closed channelといったエラーが発生する。

calc4.go
package main

import (
    "sync"
    "fmt"
    "time"
)

func main() {
    var wg sync.WaitGroup
    ch := make(chan int, 10)

    go func() {
        wg.Wait()
        close(ch)
    }()

    for i := 0; i <= 10; i++ {
        wg.Add(1)
        go func(t int) {
            defer wg.Done()
            ch <- t
        }(i)
        time.Sleep(time.Second)
    }

    sum := 0
    for v := range ch {
        sum += v
    }
    fmt.Println(sum)
}
出力結果
$ go run main.go
panic: send on closed channel

goroutine 17 [running]:
main.main.func2(0xc420014090, 0xc420086000, 0x1)
    /Users/hiroyuki/Documents/work/qiita/golang/fibonacci/main.go:22 +0x65
created by main.main
    /Users/hiroyuki/Documents/work/qiita/golang/fibonacci/main.go:20 +0xdb
exit status 2
10
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
hiroykam
45歳、独身。 ぶらぶらしつつ、お仕事を探しています。 暇なときにQiitaに投稿します。

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
10
Help us understand the problem. What is going on with this article?