初めに
A Tour of Goの「Concurrency」項目を終了したので、簡単にGoの並行処理に関してまとめました。
今回は簡単な挙動をまとめたものになるので、詳細に理解したい場合はGo言語による並行処理等の書籍を読むのも良いと思いました。
基本的な挙動
Goの並行処理は1つの処理を複数のコンポーネントに分けて、コンポーネント間で安全にデータを共有しながら計算をすることです。
Goはgoroutineという軽量のスレッドを作成して並行処理を行います。
goroutineとは
goroutineとは、Goのランタイムによって管理される軽量のスレッドです。
goroutineは以下のメリットがあります。
- goroutineはOSレベルでスレッドを生成するのではないため通常のスレッド生成よりも速い。
- goroutineのスタックサイズはスレッドのサイズよりも小さく始まり、必要に応じて大きくなるのでメモリを効率的に利用できる。
- スイッチングが速い
上記のような長所があるので、多くのgoroutineを同時に動かすことができる。
goroutineは下記のコードで作成できます。
go 関数呼び出し
Goは通常の処理にもgoroutineを使用します。
下記だと、最初にmainのgoroutineが作成され、go say("new goroutine")
のコードで新しいgoroutineが作成されます。
そのため、下記ではmainのgoroutineと新しいgoroutineの2つが動作していることになります。
func say(s string) {
for i := 0; i < 3; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
fmt.Println("start") // mainのgoroutine
go say("new") // 新しいgoroutine
say("main") // mainのgoroutine
fmt.Println("end") // mainのgoroutine
}
出力結果
start
new
main
main
new
main
end
図
main
は3回出力されますが、new
は2回しか出力されていません。
Goの並行処理はmianのgoroutineが終了すると他のgoroutineも終了する性質を持っています。
そのため、3回目のnew
が出力される前にmainのgoroutineが終了してnew
は2回しか出力されませんでした。
また、処理の順番は必ず今回のようにはならずその際の環境によって変わります。
実際に並行処理を使用する場合に、処理の流れが読めないと運用が難しいので、mainのgoroutineとその他のgoroutineで待ち合わせすることができます。
WaitGroup
WaitGroupを使用するとgoroutineの待ち合わせをすることができます。
使い方
基本的な使用方法は下記です。
Add
メソッドは数値を指定することはできますがDone
メソッドは数値を指定することができず、1ずつしかカウントが減らない様です。
import "sync" // syncパッケージを読み込む
var wg sync.WaitGroup // WaitGroupを宣言
wg.Add(1) // カウントを1つ増やす
wg.Add(2) // カウントを2つ増やす
wg.Done() // カウントを1つ減らす
wg.Wait() // カウントが0になるまで待つ
Add
を使用してカウントを増やし、Done
を使用してカウントを減らします。
Wait
はカウントが0になるまで待機するので、Add
とDone
を使用してカウントを調整し、Wait
で処理を待つ(同期を取る)使い方になります。
では、WaitGroupを使用して実際にgoroutineの待ち合わせをしてみます。
下記だと、mainのgoroutineが終了してsay関数のPrintが出力されません。
func say(s string) {
time.Sleep(100 * time.Millisecond)
fmt.Println("Hello", s)
}
func main() {
go say("new")
fmt.Println("main")
}
出力結果
main
Program exited.
図
say関数でHello new
を出力する前に1秒待機しているため、その間にmainのgoroutineが終了して、Hello new
が出力されませんでした。
Hello new
を出力するため、WaitGroupを使用して待ち合わせを行います。
func say(s string) {
time.Sleep(100 * time.Millisecond)
fmt.Println("Hello", s)
}
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
// say関数が終了するとカウントを1つ減らす
defer wg.Done()
say("new")
}()
fmt.Println("main")
// defer wg.Done()があるまで待機する
wg.Wait()
}
出力結果
main
Hello new
図
WaitGroupを使用してsay関数の実行が終わるまでmainのgoroutineを待機させることができました。
チャネル
チャネルを使用することで、goroutineで値の送受信をすることができます。
また、チャネルは送信されてから受信するまで処理を待つ性質があるので、goroutineの待ち合わせにも使用できます。
また、チャンネルは参照型であり、関数にチャネルを渡すときは実際にはチャネルのポインタを渡しています。
チャネルのデフォルト値はnilです。
使い方
基本的な構文は下記です。
make(chan チャネルの型) // チャネルを生成
ch <- 値 // チャネルへ送信する
変数 := <-ch // チャネルを変数へ割り当てる
では、実際にチャネルを使用してみます。
下記は、say関数にチャネルを渡して3秒待機します。
その後チャネルを送信して、main関数でs := <-c
でチャネルを受信後にfmt.Println("wait for", s)
を実行しています。
func say(c chan string) {
fmt.Println("called say func")
// 3秒待つ
time.Sleep(300 * time.Millisecond)
// "channel"という文字列を送信
c <- "channel"
}
func main() {
// string型のチャネルを生成
var c = make(chan string)
// チャネルをsay関数に渡す
go say(c)
// チャネルを受信するまで処理を待つ
s := <-c
fmt.Println("wait for", s)
}
出力結果
called say func
wait for channel
図
チャネルのブロック機能を使用することで、値を送受信しながらgoroutineの待ち合わせをすることができました。
複数のチャネル
チャネルは値を複数回送受信することも可能です。
先ほどと同様の関数でチャネルを2つ送受信しています。
func say(c chan string) {
fmt.Println("called say func")
// 3秒待つ
time.Sleep(300 * time.Millisecond)
c <- "channel"
c <- "channel2"
}
func main() {
// string型のチャネルを生成
var c = make(chan string)
// チャネルをsay関数に渡す
go say(c)
// チャネルを受信するまで処理を待つ
s := <-c
ss := <-c
fmt.Println(s, ss)
}
出力結果
called say func
channel channel2
バッファ
チャネルにバッファを設けることが可能です。
バッファを使用することでチャネルが受入れらるキャパシティを設定することができます。
// バッファがあるチャネルの生成
make(chan チャネルの型 バッファ数)
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
// ch <- 3 // バッファが2より少ないとエラーになる: fatal error: all goroutines are asleep - deadlock!
fmt.Println(<-ch)
fmt.Println(<-ch)
}
出力結果
1
2
Range and Close
range
を使用することで、<-ch
の様なチャネルの受信をせずとも、自動的にチャネルを受け入れてくれます。
また、チャネルの送り手側はチャネルをこれ以上送らない場合はcloseする必要があります。
closeのユースケースはrangeやfor文を使用する場合であり、通常は使用する必要はありません。
使い方
for i := range チャネル {}
下記のようなコードでチャネルを受け取りながらrangeで回すことができます。
func loop(c chan int) {
for i := 0; i < 5; i++ {
fmt.Println(i, "回目のfor文")
c <- i
}
// for文を回した後にチャネルをcloseする
// closeしないとエラーになる Error: fatal error: all goroutines are asleep - deadlock!
close(c)
}
func main() {
c := make(chan int, 5)
go loop(c)
// チャネルを逐一受け取る
for i := range c {
fmt.Println(i)
}
}
出力結果
0 回目のfor文
1 回目のfor文
2 回目のfor文
3 回目のfor文
4 回目のfor文
0
1
2
3
4
for i := range c
は、チャネルが閉じられるまで、チャネルから値を繰り返し受信し続けます。
そのため、closeがないとまだチャネルが送られると思ってエラーになってしまいます。
for文
先ほどのrange
を使用したコーディング以外だと、for文を使用してチャネルを受け取ることができます。
チャネルを受け取る際に第2引数を設定すると、チャネルがcloseされているかどうかを確認することができます。
受信する値がない、かつチャネルがcloseしている場合に第2引数にfalseを受け取ります。
v, ok := <-ch
では、for文を使用してチャネルを受け取ってみます。
func loop(c chan int) {
for i := 0; i < 5; i++ {
c <- i
}
close(c)
}
func main() {
c := make(chan int, 5)
go loop(c)
for i := 0; i < 6; i++ {
v, ok := <-c
// チャネルがcleseしたらfor文を終了する
if ok != true {
break
}
fmt.Println(v, ok)
}
}
出力結果
0 true
1 true
2 true
3 true
4 true
for文を使用してチャネルを受け取ることができました。
select
selectは複数のcaseを用意して、準備ができらたそのcaseを実行します。
caseは準備ができるまで処理をブロックします。
下記だと、5回case c <- x
を実行して、quitが送信されたタイミングでcase <-quit
が実行されます。
func add(c, quit chan int) {
x := 1
// 無限ループの構文だが、quitを受け取ると終了する
for {
select {
// チャネルを送信する
case c <- x:
x++
// quitを受け取ると終了する
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 5; i++ {
// チャネルを受け取って出力する
fmt.Println(<-c)
}
// チャネルを送信する
quit <- 0
}()
// add関数を呼び出す
add(c, quit)
}
出力結果
1
2
3
4
5
quit
処理をブロックしたくない場合は、defaultを用いるとcaseの準備ができていない場合にdefaultが実行される様になります。
下記だと、数字の出力の間にdefault
が複数出力されます。
caseの準備ができるまでfor文で回し続けていることが分かります。
func add(c, quit chan int) {
x := 1
// 無限ループの構文だが、quitを受け取ると終了する
for {
select {
// チャネルを送信する
case c <- x:
x++
// quitを受け取ると終了する
case <-quit:
fmt.Println("quit")
return
default:
fmt.Println("default")
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 5; i++ {
// チャネルを受け取って出力する
fmt.Println(<-c)
}
// チャネルを送信する
quit <- 0
}()
// add関数を呼び出す
add(c, quit)
}
エラーになるケース
package main
import "fmt"
func main() { //liststart
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
v := 1
ch1 <- v // ch1に書き込めない限りここで待たされる
v2 := <-ch2
fmt.Println(v, v2)
}()
v := 2
ch2 <- v // ch2に書き込めない限りここで待たされる
v2 := <-ch1
fmt.Println(v, v2)
}
バッファを設けていないチャネルの場合、書き込みした後は受信するまで待機状態になる。
そのため、ch1
とch2
が待機状態となりデットロックしてエラーになる。
selectは、caseのいずれかが実行可能である場合は、そのcaseを実行するため以下の挙動になる。
package main
import "fmt"
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
v := 1
ch1 <- v // 1が ch1に書かれる
v2 := <-ch2
fmt.Print("無名関数内: ", v, " ", v2, "\n")
}()
v := 2
var v2 int
select { // チャネルでのやり取りをselectで囲む
case ch2 <- v: // こちらはまだ書き込めない
case v2 = <-ch1: // 1がch1に入ればこれが実行される。v2は1になる
}
fmt.Print("mainの最後: ", v, " ", v2, "\n")
}
// 結果
mainの最後: 2 1
クロージャーが原因で意図しない挙動になる場合
下記の場合は全て40が出力される。
package main
import "fmt"
func main() {
a := []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20} //liststart
ch := make(chan int, len(a))
for _, v := range a {
go func() {
ch <- v * 2
}()
}
for i := 0; i < len(a); i++ {
fmt.Print(<-ch, " ")
}
fmt.Println()
}
// 結果
// ./prog.go:10:10: loop variable v captured by func literal
// Go vet failed.
上記では、下記関数がgorutineとして実行されるより早くfor _, v := range a
が終了するので変数a
の最後の値の20がvになっている。
go func() {
ch <- v * 2
}()
その場合は下記のように、毎回関数にvを引数として渡すと解決される。
package main
import "fmt"
func main() {
a := []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}
ch := make(chan int, len(a))
for _, v := range a {
go func(val int) { // ループ変数を引数として受け取る
ch <- val * 2
}(v) // ループ変数を匿名関数に渡す
}
for i := 0; i < len(a); i++ {
fmt.Print(<-ch, " ")
}
fmt.Println()
}
結果
40 16 8 12 28 20 24 32 36 4
ゴルーチンリーク
下記だと、countTo関数のfor文が終了する前に受けて側のfor-range文が終了するためcountToで生成されたgorutineが使用され続ける。
そのため、メモリを必要以上に消費するなどのデメリットがあるので注意が必要である。
package main
import (
"fmt"
)
func countTo(max int) <-chan int {
ch := make(chan int)
go func() {
for i := 0; i < max; i++ {
fmt.Println("countTo: ", i)
ch <- i
}
close(ch)
}()
return ch
}
func main() {
for i := range countTo(10) {
fmt.Println("main: ", i)
if i > 5 {
break
}
}
}
結果
countTo: 0
countTo: 1
main: 0
main: 1
countTo: 2
countTo: 3
main: 2
main: 3
countTo: 4
countTo: 5
main: 4
main: 5
countTo: 6
countTo: 7
main: 6
並行処理を使用するタイミング
並行処理をする際はオーバーヘッドーが発生するので、処理が短い場合などは並行処理を使うべきではありません。
また、並行処理を用いるとデバッグが大変になったり、コードの可読性の低下に繋がる場合もあります。
並行処理はネットワークリクエストやディスクへの読み書きなどが起こる場合など、ここぞという時に用いるべきでしょう。
以上、Goの並行処理に関して簡単にまとめてみました。
まだ浅い理解なので今後機会があれば深く勉強してみたいと思います。
参考にしたサイト・動画
https://zenn.dev/hsaki/books/golang-concurrency/viewer/basicusage
https://www.youtube.com/watch?v=_PpJjo2iAZ0&t=688s
https://www.youtube.com/watch?v=buYA-4_JQVQ