注意
掲載したコードを実行したとき,デッドロックが発生する可能性があります.
目標
以下の関数longProcess
の結果をスライスに格納する.
func longProcess(n int) string {
rndValue := rand.Intn(1000)
sleepTime := time.Millisecond * time.Duration(rndValue)
time.Sleep(sleepTime)
msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
fmt.Println(msg)
return msg
}
longProcessの概要
-
longProcess
関数の処理は2乗の値の計算.戻り値として計算式と処理時間を文字列として返す. -
randValue
として0~999の乱数を取得し,randValue
[ms]の間sleepすることでランダムな実行時間にする.
例えばこのコードで10回実行して実行順に格納するとき,結果例は以下のようになる.
10回実行するコード例
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
size := 10
problem := make([]int, size)
for i := 0; i < size; i++ {
problem[i] = rand.Intn(100)
}
answer := make([]string, size)
for i, v := range problem {
answer[i] = longProcess(v)
}
for i, v := range answer {
fmt.Printf("%d: %s\n", i, v)
}
}
func longProcess(n int) string {
rndValue := rand.Intn(1000)
sleepTime := time.Millisecond * time.Duration(rndValue)
time.Sleep(sleepTime)
msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
fmt.Println(msg)
return msg
}
0: 5^2=25 [842ms]
1: 68^2=4624 [542ms]
2: 89^2=7921 [383ms]
3: 75^2=5625 [871ms]
4: 28^2=784 [129ms]
5: 57^2=3249 [128ms]
6: 91^2=8281 [370ms]
7: 37^2=1369 [540ms]
8: 74^2=5476 [177ms]
9: 24^2=576 [92ms]
この例ではproblem
にランダムな問題として0〜99の乱数を格納しておき,problem
の順にlongProcess
を実行し,その結果をスライスanswer
に格納する.
実行時間は問題数size
とランダムな処理時間に依存する.実行時間の期待値は
$$\sum_{k=0}^{size-1}\frac{1}{1000}\sum_{l=0}^{1000-1}l ;[ms] \fallingdotseq 500size[ms]$$
なので,size=10
だと約5秒,size=100
だと約50秒になり,かなり遅くなる.そこで,並列処理で高速化する.
並列化
goroutineの使用
並列化のためにgoroutine (ゴルーチン)と呼ばれる軽量なスレッドをつくる.実は普段実行しているプログラムもgoroutineの上で動作している(メイン goroutineと呼ばれる).
新しいgoroutineは関数呼び出しの頭にgoをつけるだけで作ることができる.
for i, v := range problem {
go longProcess(v)
}
Channel
goroutineは戻り値をもたないのでans
への代入はできない.そこでgoroutine間の通信にはChannel(チャネル)型を使う.Channnelはスライスのようにして生成する.
ch := make(chan 型)
今回は複数(size)個のデータを通信するので,バッファサイズも指定する.
ch := make(chan string, size)
送り手はこれ以上の送信する値がないことを示すためチャネルをcloseできる.
close(ch)
戻り値をChannelで渡せるようにするためにlongProcess
を書き換える.引数にch chan string
を追加する.戻り値の型は削除してvoid型とし,かわりにch
に代入する.
func longProcess(n int, ch chan string) {
rndValue := rand.Intn(1000)
sleepTime := time.Millisecond * time.Duration(rndValue)
time.Sleep(sleepTime)
msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
fmt.Println(msg)
ch <- msg
}
longProcess
関数の変更に合わせてmain
関数も書き換える.
func main() {
size := 100
problem := make([]int, size)
ch := make(chan string, size)
for i := 0; i < size; i++ {
problem[i] = rand.Intn(100)
}
for _, v := range problem {
go longProcess(v, ch)
}
close(ch)
for v := range ch {
fmt.Printf("%s\n", v)
}
}
この状態でコードを実行すると,何も表示されずにプログラムが終了する.これは,メインgoroutineが終了したとき,そこから呼び出されたgoroutineも終了するためだ.
Channelで値を渡すことができるコード
package main
import (
"fmt"
"math/rand"
"time"
)
type Result struct {
index int
value string
}
func main() {
size := 100
problem := make([]int, size)
ch := make(chan string, size)
for i := 0; i < size; i++ {
problem[i] = rand.Intn(100)
}
for _, v := range problem {
go longProcess(v, ch)
}
close(ch)
for v := range ch {
fmt.Printf("%s\n", v)
}
}
func longProcess(n int, ch chan string) {
rndValue := rand.Intn(1000)
sleepTime := time.Millisecond * time.Duration(rndValue)
time.Sleep(sleepTime)
msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
fmt.Println(msg)
ch <- msg
}
WaitGroup
longProcess
が途中終了しないようにするためには,longProcess
の終了までメインgoroutineを待機させる.処理を待機させるためにsync.WaitGroup
を使う.
var wg sync.WaitGroup
WaitGroupの内部実装は以下のようになっている.
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64
sema uint32
}
sema
がカウンタになっており,待機する処理を表す.カウンタはAdd
で更新する.
wg.Add(size)
処理が1つ終わった場合はDone
によりカウンタを減らす.
wg.Done()
longProcess
でWaitGroupを操作できるようにする.引数としてWaitGroupの参照を受け取り,処理の終了後にDone()
を呼び出す.
func longProcess(n int, ch chan string, wg *sync.WaitGroup) {
rndValue := rand.Intn(1000)
sleepTime := time.Millisecond * time.Duration(rndValue)
time.Sleep(sleepTime)
msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
fmt.Println(msg)
ch <- msg
wg.Done()
}
longProcess
の呼び出しではwg
の参照を渡す.
for _, v := range problem {
go longProcess(v, ch, &wg)
}
メインgoroutineでは,他のgoroutineを待つためにWait()
を呼び出す.
wg.Wait()
ここで,Wait
の呼び出しはclose
より前にする.そうしないと,メインgoroutineは待機前にChannelをcloseしてしまい,panicが起きる.
for _, v := range problem {
go longProcess(v, ch, &wg)
}
wg.Wait()
close(ch)
これでlongProcess
を並列呼び出しできた.トータルの実行時間は大幅に短縮された.
66^2=4356 [1ms]
89^2=7921 [3ms]
58^2=3364 [7ms]
33^2=1089 [9ms]
1^2=1 [28ms]
︙
メインgoroutineを待機させることができるコード
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Result struct {
index int
value string
}
func main() {
size := 100
problem := make([]int, size)
ch := make(chan string, size)
var wg sync.WaitGroup
wg.Add(size)
for i := 0; i < size; i++ {
problem[i] = rand.Intn(100)
}
for _, v := range problem {
go longProcess(v, ch, &wg)
}
wg.Wait()
close(ch)
for v := range ch {
fmt.Printf("%s\n", v)
}
}
func longProcess(n int, ch chan string, wg *sync.WaitGroup) {
rndValue := rand.Intn(1000)
sleepTime := time.Millisecond * time.Duration(rndValue)
time.Sleep(sleepTime)
msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
fmt.Println(msg)
ch <- msg
wg.Done()
}
並び替え
プログラムの実行結果右側の処理時間(sleep時間)を見ればわかるが,結果の表示は処理の終了順になっている.本来はproblem
のインデックスと対応した並びになってほしい.処理時間に関係なく順番を維持するために,longProcess
にスライスを直接渡してその要素を書き換える方法が考えられるが,安全性を考えるとMutexを使う必要がある.そこで,処理後の値を並び替える方法をとる.
インデックスと出力の文字列を保持するために,構造体Result
を定義する.
type Result struct {
index int
value string
}
これに合わせてChannnelの宣言を変更する.
ch := make(chan Result, size)
longProcess
の引数も変更する.また,インデックスの値も受け取ることができるようにする.そして,ch
にResult
型を生成して渡す.
func longProcess(n int, index int, ch chan Result, wg *sync.WaitGroup) {
rndValue := rand.Intn(1000)
sleepTime := time.Millisecond * time.Duration(rndValue)
time.Sleep(sleepTime)
msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
fmt.Println(msg)
ch <- Result{index: index, value: msg}
wg.Done()
}
longProcess
関数の定義変更に合わせて呼び出し側も変更する.
for i, v := range problem {
go longProcess(v, i, ch, &wg)
}
また,結果の出力をstring
から構造体に変更する.
for v := range ch {
fmt.Printf("%v\n", v)
}
プログラムを変更して実行すると以下のような出力が得られる.
{6 53^2=2809 [14ms]}
{76 28^2=784 [16ms]}
{78 36^2=1296 [41ms]}
{9 30^2=900 [50ms]}
{7 35^2=1225 [90ms]}
︙
最後にResult
型のindex
の値を用いて,文字列をanswer
スライスの対応するインデックスに代入する.
answer := make([]string, size)
for v := range ch {
answer[v.index] = v.value
}
answer
の出力は以下のようにすればできる.
for i, v := range answer {
fmt.Printf("%d: %s\n", i, v)
}
最終的なコード
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Result struct {
index int
value string
}
func main() {
size := 100
problem := make([]int, size)
ch := make(chan Result, size)
var wg sync.WaitGroup
wg.Add(size)
for i := 0; i < size; i++ {
problem[i] = rand.Intn(100)
}
for i, v := range problem {
go longProcess(v, i, ch, &wg)
}
wg.Wait()
close(ch)
answer := make([]string, size)
for v := range ch {
answer[v.index] = v.value
}
for i, v := range answer {
fmt.Printf("%d: %s\n", i, v)
}
}
func longProcess(n int, index int, ch chan Result, wg *sync.WaitGroup) {
rndValue := rand.Intn(1000)
sleepTime := time.Millisecond * time.Duration(rndValue)
time.Sleep(sleepTime)
msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
fmt.Println(msg)
ch <- Result{index: index, value: msg}
wg.Done()
}