0
0

Goのgoroutineで並列処理した結果を順番を維持したまま配列(スライス)に格納する

Last updated at Posted at 2024-08-23

注意

掲載したコードを実行したとき,デッドロックが発生する可能性があります.

目標

以下の関数longProcessの結果をスライスに格納する.

func longProcess(n int) string {
	rndValue := rand.Intn(1000)
	sleepTime := time.Millisecond * time.Duration(rndValue)
	time.Sleep(sleepTime)
	msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
	fmt.Println(msg)
	return msg
}

process.png

longProcessの概要

  • longProcess関数の処理は2乗の値の計算.戻り値として計算式と処理時間を文字列として返す.
  • randValueとして0~999の乱数を取得し,randValue[ms]の間sleepすることでランダムな実行時間にする.

例えばこのコードで10回実行して実行順に格納するとき,結果例は以下のようになる.

10回実行するコード例
package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
	size := 10
	problem := make([]int, size)
	for i := 0; i < size; i++ {
		problem[i] = rand.Intn(100)
	}

	answer := make([]string, size)
	for i, v := range problem {
		answer[i] = longProcess(v)
	}

	for i, v := range answer {
		fmt.Printf("%d: %s\n", i, v)
	}
}

func longProcess(n int) string {
	rndValue := rand.Intn(1000)
	sleepTime := time.Millisecond * time.Duration(rndValue)
	time.Sleep(sleepTime)
	msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
	fmt.Println(msg)
	return msg
}
0: 5^2=25 [842ms]
1: 68^2=4624 [542ms]
2: 89^2=7921 [383ms]
3: 75^2=5625 [871ms]
4: 28^2=784 [129ms]
5: 57^2=3249 [128ms]
6: 91^2=8281 [370ms]
7: 37^2=1369 [540ms]
8: 74^2=5476 [177ms]
9: 24^2=576 [92ms]

この例ではproblemにランダムな問題として0〜99の乱数を格納しておき,problemの順にlongProcessを実行し,その結果をスライスanswerに格納する.
実行時間は問題数sizeとランダムな処理時間に依存する.実行時間の期待値は
$$\sum_{k=0}^{size-1}\frac{1}{1000}\sum_{l=0}^{1000-1}l ;[ms] \fallingdotseq 500size[ms]$$
なので,size=10だと約5秒,size=100だと約50秒になり,かなり遅くなる.そこで,並列処理で高速化する.

並列化

goroutineの使用

並列化のためにgoroutine (ゴルーチン)と呼ばれる軽量なスレッドをつくる.実は普段実行しているプログラムもgoroutineの上で動作している(メイン goroutineと呼ばれる).
新しいgoroutineは関数呼び出しの頭にgoをつけるだけで作ることができる.

for i, v := range problem {
    go longProcess(v)
}

Channel

goroutineは戻り値をもたないのでansへの代入はできない.そこでgoroutine間の通信にはChannel(チャネル)型を使う.Channnelはスライスのようにして生成する.

ch := make(chan )

今回は複数(size)個のデータを通信するので,バッファサイズも指定する.

ch := make(chan string, size)

送り手はこれ以上の送信する値がないことを示すためチャネルをcloseできる.

close(ch)

戻り値をChannelで渡せるようにするためにlongProcessを書き換える.引数にch chan stringを追加する.戻り値の型は削除してvoid型とし,かわりにchに代入する.

func longProcess(n int, ch chan string) {
	rndValue := rand.Intn(1000)
	sleepTime := time.Millisecond * time.Duration(rndValue)
	time.Sleep(sleepTime)
	msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
	fmt.Println(msg)
	ch <- msg
}

longProcess関数の変更に合わせてmain関数も書き換える.

func main() {
	size := 100
	problem := make([]int, size)
	ch := make(chan string, size)
	for i := 0; i < size; i++ {
		problem[i] = rand.Intn(100)
	}

	for _, v := range problem {
		go longProcess(v, ch)
	}
	close(ch)

	for v := range ch {
		fmt.Printf("%s\n", v)
	}
}

この状態でコードを実行すると,何も表示されずにプログラムが終了する.これは,メインgoroutineが終了したとき,そこから呼び出されたgoroutineも終了するためだ.

wait.png

Channelで値を渡すことができるコード
package main

import (
	"fmt"
	"math/rand"
	"time"
)

type Result struct {
	index int
	value string
}

func main() {
	size := 100
	problem := make([]int, size)
	ch := make(chan string, size)
	for i := 0; i < size; i++ {
		problem[i] = rand.Intn(100)
	}

	for _, v := range problem {
		go longProcess(v, ch)
	}
	close(ch)

	for v := range ch {
		fmt.Printf("%s\n", v)
	}
}

func longProcess(n int, ch chan string) {
	rndValue := rand.Intn(1000)
	sleepTime := time.Millisecond * time.Duration(rndValue)
	time.Sleep(sleepTime)
	msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
	fmt.Println(msg)
	ch <- msg
}

WaitGroup

longProcessが途中終了しないようにするためには,longProcessの終了までメインgoroutineを待機させる.処理を待機させるためにsync.WaitGroupを使う.

var wg sync.WaitGroup

WaitGroupの内部実装は以下のようになっている.

type WaitGroup struct {
	noCopy noCopy
	state atomic.Uint64
	sema  uint32
}

semaがカウンタになっており,待機する処理を表す.カウンタはAddで更新する.

wg.Add(size)

処理が1つ終わった場合はDoneによりカウンタを減らす.

wg.Done()

longProcessでWaitGroupを操作できるようにする.引数としてWaitGroupの参照を受け取り,処理の終了後にDone()を呼び出す.

func longProcess(n int, ch chan string, wg *sync.WaitGroup) {
	rndValue := rand.Intn(1000)
	sleepTime := time.Millisecond * time.Duration(rndValue)
	time.Sleep(sleepTime)
	msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
	fmt.Println(msg)
	ch <- msg
	wg.Done()
}

longProcessの呼び出しではwgの参照を渡す.

for _, v := range problem {
    go longProcess(v, ch, &wg)
}

メインgoroutineでは,他のgoroutineを待つためにWait()を呼び出す.

wg.Wait()

ここで,Waitの呼び出しはcloseより前にする.そうしないと,メインgoroutineは待機前にChannelをcloseしてしまい,panicが起きる.

for _, v := range problem {
    go longProcess(v, ch, &wg)
}
wg.Wait()
close(ch)

これでlongProcessを並列呼び出しできた.トータルの実行時間は大幅に短縮された.

66^2=4356 [1ms]
89^2=7921 [3ms]
58^2=3364 [7ms]
33^2=1089 [9ms]
1^2=1 [28ms]
︙
メインgoroutineを待機させることができるコード
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Result struct {
	index int
	value string
}

func main() {
	size := 100
	problem := make([]int, size)
	ch := make(chan string, size)
	var wg sync.WaitGroup
	wg.Add(size)
	for i := 0; i < size; i++ {
		problem[i] = rand.Intn(100)
	}

	for _, v := range problem {
		go longProcess(v, ch, &wg)
	}
	wg.Wait()
	close(ch)

	for v := range ch {
		fmt.Printf("%s\n", v)
	}
}

func longProcess(n int, ch chan string, wg *sync.WaitGroup) {
	rndValue := rand.Intn(1000)
	sleepTime := time.Millisecond * time.Duration(rndValue)
	time.Sleep(sleepTime)
	msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
	fmt.Println(msg)
	ch <- msg
	wg.Done()
}

並び替え

プログラムの実行結果右側の処理時間(sleep時間)を見ればわかるが,結果の表示は処理の終了順になっている.本来はproblemのインデックスと対応した並びになってほしい.処理時間に関係なく順番を維持するために,longProcessにスライスを直接渡してその要素を書き換える方法が考えられるが,安全性を考えるとMutexを使う必要がある.そこで,処理後の値を並び替える方法をとる.

インデックスと出力の文字列を保持するために,構造体Resultを定義する.

type Result struct {
	index int
	value string
}

これに合わせてChannnelの宣言を変更する.

ch := make(chan Result, size)

longProcessの引数も変更する.また,インデックスの値も受け取ることができるようにする.そして,chResult型を生成して渡す.

func longProcess(n int, index int, ch chan Result, wg *sync.WaitGroup) {
	rndValue := rand.Intn(1000)
	sleepTime := time.Millisecond * time.Duration(rndValue)
	time.Sleep(sleepTime)
	msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
	fmt.Println(msg)
	ch <- Result{index: index, value: msg}
	wg.Done()
}

longProcess関数の定義変更に合わせて呼び出し側も変更する.

for i, v := range problem {
    go longProcess(v, i, ch, &wg)
}

また,結果の出力をstringから構造体に変更する.

for v := range ch {
    fmt.Printf("%v\n", v)
}

プログラムを変更して実行すると以下のような出力が得られる.

{6 53^2=2809 [14ms]}
{76 28^2=784 [16ms]}
{78 36^2=1296 [41ms]}
{9 30^2=900 [50ms]}
{7 35^2=1225 [90ms]}
︙

最後にResult型のindexの値を用いて,文字列をanswerスライスの対応するインデックスに代入する.

answer := make([]string, size)
for v := range ch {
    answer[v.index] = v.value
}

answerの出力は以下のようにすればできる.

for i, v := range answer {
    fmt.Printf("%d: %s\n", i, v)
}
最終的なコード
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Result struct {
	index int
	value string
}

func main() {
	size := 100
	problem := make([]int, size)
	ch := make(chan Result, size)
	var wg sync.WaitGroup
	wg.Add(size)
	for i := 0; i < size; i++ {
		problem[i] = rand.Intn(100)
	}

	for i, v := range problem {
		go longProcess(v, i, ch, &wg)
	}
	wg.Wait()
	close(ch)

	answer := make([]string, size)
	for v := range ch {
		answer[v.index] = v.value
	}
	for i, v := range answer {
		fmt.Printf("%d: %s\n", i, v)
	}
}

func longProcess(n int, index int, ch chan Result, wg *sync.WaitGroup) {
	rndValue := rand.Intn(1000)
	sleepTime := time.Millisecond * time.Duration(rndValue)
	time.Sleep(sleepTime)
	msg := fmt.Sprintf("%d^2=%d [%dms]", n, n*n, rndValue)
	fmt.Println(msg)
	ch <- Result{index: index, value: msg}
	wg.Done()
}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0