0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Goの並行処理に関して簡単にまとめてみた

Last updated at Posted at 2023-01-09

初めに

A Tour of Goの「Concurrency」項目を終了したので、簡単にGoの並行処理に関してまとめました。
今回は簡単な挙動をまとめたものになるので、詳細に理解したい場合はGo言語による並行処理等の書籍を読むのも良いと思いました。

基本的な挙動

Goの並行処理は1つの処理を複数のコンポーネントに分けて、コンポーネント間で安全にデータを共有しながら計算をすることです。
Goはgoroutineという軽量のスレッドを作成して並行処理を行います。

goroutineとは

goroutineとは、Goのランタイムによって管理される軽量のスレッドです。
goroutineは以下のメリットがあります。

  • goroutineはOSレベルでスレッドを生成するのではないため通常のスレッド生成よりも速い。
  • goroutineのスタックサイズはスレッドのサイズよりも小さく始まり、必要に応じて大きくなるのでメモリを効率的に利用できる。
  • スイッチングが速い
    上記のような長所があるので、多くのgoroutineを同時に動かすことができる。

goroutineは下記のコードで作成できます。

go 関数呼び出し

Goは通常の処理にもgoroutineを使用します。
下記だと、最初にmainのgoroutineが作成され、go say("new goroutine")のコードで新しいgoroutineが作成されます。
そのため、下記ではmainのgoroutineと新しいgoroutineの2つが動作していることになります。

func say(s string) {
	for i := 0; i < 3; i++ {
		time.Sleep(100 * time.Millisecond)
		fmt.Println(s)
	}
}

func main() {
	fmt.Println("start") // mainのgoroutine
	go say("new")        // 新しいgoroutine
	say("main")          // mainのgoroutine
	fmt.Println("end")   // mainのgoroutine
}
出力結果
start
new
main
main
new
main
end

スクリーンショット 2023-01-07 14.12.18.png

mainは3回出力されますが、newは2回しか出力されていません。
Goの並行処理はmianのgoroutineが終了すると他のgoroutineも終了する性質を持っています。
そのため、3回目のnewが出力される前にmainのgoroutineが終了してnewは2回しか出力されませんでした。

また、処理の順番は必ず今回のようにはならずその際の環境によって変わります。
実際に並行処理を使用する場合に、処理の流れが読めないと運用が難しいので、mainのgoroutineとその他のgoroutineで待ち合わせすることができます。

WaitGroup

WaitGroupを使用するとgoroutineの待ち合わせをすることができます。

使い方

基本的な使用方法は下記です。

Addメソッドは数値を指定することはできますがDoneメソッドは数値を指定することができず、1ずつしかカウントが減らない様です。

import "sync" // syncパッケージを読み込む

var wg sync.WaitGroup // WaitGroupを宣言


wg.Add(1) // カウントを1つ増やす
wg.Add(2) // カウントを2つ増やす

wg.Done() // カウントを1つ減らす

wg.Wait() // カウントが0になるまで待つ

Addを使用してカウントを増やし、Doneを使用してカウントを減らします。
Waitはカウントが0になるまで待機するので、AddDoneを使用してカウントを調整し、Waitで処理を待つ(同期を取る)使い方になります。

では、WaitGroupを使用して実際にgoroutineの待ち合わせをしてみます。
下記だと、mainのgoroutineが終了してsay関数のPrintが出力されません。

func say(s string) {
	time.Sleep(100 * time.Millisecond)
	fmt.Println("Hello", s)
}

func main() {
	go say("new")
	fmt.Println("main")
}
出力結果
main

Program exited.

スクリーンショット 2023-01-07 14.16.39.png

say関数でHello newを出力する前に1秒待機しているため、その間にmainのgoroutineが終了して、Hello newが出力されませんでした。

Hello newを出力するため、WaitGroupを使用して待ち合わせを行います。

func say(s string) {
	time.Sleep(100 * time.Millisecond)
	fmt.Println("Hello", s)
}

func main() {
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		// say関数が終了するとカウントを1つ減らす
		defer wg.Done()
		say("new")
	}()
	fmt.Println("main")
	// defer wg.Done()があるまで待機する
	wg.Wait()
}
出力結果
main
Hello new

スクリーンショット 2023-01-07 14.21.54.png

WaitGroupを使用してsay関数の実行が終わるまでmainのgoroutineを待機させることができました。

チャネル

チャネルを使用することで、goroutineで値の送受信をすることができます。
また、チャネルは送信されてから受信するまで処理を待つ性質があるので、goroutineの待ち合わせにも使用できます。

また、チャンネルは参照型であり、関数にチャネルを渡すときは実際にはチャネルのポインタを渡しています。
チャネルのデフォルト値はnilです。

使い方

基本的な構文は下記です。

make(chan チャネルの型) // チャネルを生成
ch <-     // チャネルへ送信する
変数 := <-ch  // チャネルを変数へ割り当てる

では、実際にチャネルを使用してみます。
下記は、say関数にチャネルを渡して3秒待機します。
その後チャネルを送信して、main関数でs := <-cでチャネルを受信後にfmt.Println("wait for", s)を実行しています。

func say(c chan string) {
	fmt.Println("called say func")
	// 3秒待つ
	time.Sleep(300 * time.Millisecond)
	// "channel"という文字列を送信
	c <- "channel"
}

func main() {
	// string型のチャネルを生成
	var c = make(chan string)
	// チャネルをsay関数に渡す
	go say(c)
	// チャネルを受信するまで処理を待つ
	s := <-c
	fmt.Println("wait for", s)
}
出力結果
called say func
wait for channel

スクリーンショット 2023-01-07 14.32.10.png

チャネルのブロック機能を使用することで、値を送受信しながらgoroutineの待ち合わせをすることができました。

複数のチャネル

チャネルは値を複数回送受信することも可能です。
先ほどと同様の関数でチャネルを2つ送受信しています。

func say(c chan string) {
	fmt.Println("called say func")
	// 3秒待つ
	time.Sleep(300 * time.Millisecond)
	c <- "channel"
	c <- "channel2"
}

func main() {
	// string型のチャネルを生成
	var c = make(chan string)
	// チャネルをsay関数に渡す
	go say(c)
	// チャネルを受信するまで処理を待つ
	s := <-c
	ss := <-c
	fmt.Println(s, ss)
}
出力結果
called say func
channel channel2

バッファ

チャネルにバッファを設けることが可能です。
バッファを使用することでチャネルが受入れらるキャパシティを設定することができます。

// バッファがあるチャネルの生成
make(chan チャネルの型 バッファ数)
func main() {
	ch := make(chan int, 2)
	ch <- 1
	ch <- 2
	// ch <- 3 // バッファが2より少ないとエラーになる: fatal error: all goroutines are asleep - deadlock!
	fmt.Println(<-ch)
	fmt.Println(<-ch)
}

出力結果
1
2

Range and Close

rangeを使用することで、<-chの様なチャネルの受信をせずとも、自動的にチャネルを受け入れてくれます。
また、チャネルの送り手側はチャネルをこれ以上送らない場合はcloseする必要があります。
closeのユースケースはrangeやfor文を使用する場合であり、通常は使用する必要はありません。

使い方

for i := range チャネル {}

下記のようなコードでチャネルを受け取りながらrangeで回すことができます。

func loop(c chan int) {
	for i := 0; i < 5; i++ {
		fmt.Println(i, "回目のfor文")
		c <- i
	}
	// for文を回した後にチャネルをcloseする
	// closeしないとエラーになる Error: fatal error: all goroutines are asleep - deadlock!
	close(c)
}

func main() {
	c := make(chan int, 5)
	go loop(c)
	// チャネルを逐一受け取る
	for i := range c {
		fmt.Println(i)
	}
}
出力結果
0 回目のfor文
1 回目のfor文
2 回目のfor文
3 回目のfor文
4 回目のfor文
0
1
2
3
4

for i := range c は、チャネルが閉じられるまで、チャネルから値を繰り返し受信し続けます。
そのため、closeがないとまだチャネルが送られると思ってエラーになってしまいます。

for文

先ほどのrangeを使用したコーディング以外だと、for文を使用してチャネルを受け取ることができます。
チャネルを受け取る際に第2引数を設定すると、チャネルがcloseされているかどうかを確認することができます。
受信する値がない、かつチャネルがcloseしている場合に第2引数にfalseを受け取ります。

v, ok := <-ch

では、for文を使用してチャネルを受け取ってみます。

func loop(c chan int) {
	for i := 0; i < 5; i++ {
		c <- i
	}
	close(c)
}

func main() {
	c := make(chan int, 5)
	go loop(c)
	for i := 0; i < 6; i++ {
		v, ok := <-c
		// チャネルがcleseしたらfor文を終了する
		if ok != true {
			break
		}
		fmt.Println(v, ok)
	}
}
出力結果
0 true
1 true
2 true
3 true
4 true

for文を使用してチャネルを受け取ることができました。

select

selectは複数のcaseを用意して、準備ができらたそのcaseを実行します。
caseは準備ができるまで処理をブロックします。
下記だと、5回case c <- xを実行して、quitが送信されたタイミングでcase <-quitが実行されます。

func add(c, quit chan int) {
	x := 1
	// 無限ループの構文だが、quitを受け取ると終了する
	for {
		select {
		// チャネルを送信する
		case c <- x:
			x++
		// quitを受け取ると終了する
		case <-quit:
			fmt.Println("quit")
			return
		}
	}
}

func main() {
	c := make(chan int)
	quit := make(chan int)
	go func() {
		for i := 0; i < 5; i++ {
			// チャネルを受け取って出力する
			fmt.Println(<-c)
		}
		// チャネルを送信する
		quit <- 0
	}()
	// add関数を呼び出す
	add(c, quit)
}
出力結果
1
2
3
4
5
quit

処理をブロックしたくない場合は、defaultを用いるとcaseの準備ができていない場合にdefaultが実行される様になります。
下記だと、数字の出力の間にdefaultが複数出力されます。
caseの準備ができるまでfor文で回し続けていることが分かります。

func add(c, quit chan int) {
	x := 1
	// 無限ループの構文だが、quitを受け取ると終了する
	for {
		select {
		// チャネルを送信する
		case c <- x:
			x++
		// quitを受け取ると終了する
		case <-quit:
			fmt.Println("quit")
			return
		default:
			fmt.Println("default")
		}
	}
}

func main() {
	c := make(chan int)
	quit := make(chan int)
	go func() {
		for i := 0; i < 5; i++ {
			// チャネルを受け取って出力する
			fmt.Println(<-c)
		}
		// チャネルを送信する
		quit <- 0
	}()
	// add関数を呼び出す
	add(c, quit)
}

エラーになるケース

package main

import "fmt"

func main() { //liststart
	ch1 := make(chan int)
	ch2 := make(chan int)

	go func() {
		v := 1
		ch1 <- v // ch1に書き込めない限りここで待たされる
		v2 := <-ch2
		fmt.Println(v, v2)
	}()

	v := 2
	ch2 <- v // ch2に書き込めない限りここで待たされる
	v2 := <-ch1
	fmt.Println(v, v2)
}

バッファを設けていないチャネルの場合、書き込みした後は受信するまで待機状態になる。
そのため、ch1ch2が待機状態となりデットロックしてエラーになる。

selectは、caseのいずれかが実行可能である場合は、そのcaseを実行するため以下の挙動になる。

package main

import "fmt"

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	go func() {
		v := 1
		ch1 <- v // 1が ch1に書かれる
		v2 := <-ch2
		fmt.Print("無名関数内: ", v, " ", v2, "\n")
	}()

	v := 2
	var v2 int
	select { // チャネルでのやり取りをselectで囲む
	case ch2 <- v: // こちらはまだ書き込めない
	case v2 = <-ch1: // 1がch1に入ればこれが実行される。v2は1になる
	}
	fmt.Print("mainの最後: ", v, " ", v2, "\n")
}

// 結果
mainの最後: 2 1

クロージャーが原因で意図しない挙動になる場合

下記の場合は全て40が出力される。

package main

import "fmt"

func main() {
	a := []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20} //liststart
	ch := make(chan int, len(a))
	for _, v := range a {
		go func() {
			ch <- v * 2
		}()
	}
	for i := 0; i < len(a); i++ {
		fmt.Print(<-ch, " ")
	}
	fmt.Println()
}

// 結果
// ./prog.go:10:10: loop variable v captured by func literal

// Go vet failed.

上記では、下記関数がgorutineとして実行されるより早くfor _, v := range aが終了するので変数aの最後の値の20がvになっている。

		go func() {
			ch <- v * 2
		}()

その場合は下記のように、毎回関数にvを引数として渡すと解決される。

package main

import "fmt"

func main() {
	a := []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}
	ch := make(chan int, len(a))
	for _, v := range a {
		go func(val int) { // ループ変数を引数として受け取る
			ch <- val * 2
		}(v) // ループ変数を匿名関数に渡す
	}
	for i := 0; i < len(a); i++ {
		fmt.Print(<-ch, " ")
	}
	fmt.Println()
}

結果

40 16 8 12 28 20 24 32 36 4 

ゴルーチンリーク

下記だと、countTo関数のfor文が終了する前に受けて側のfor-range文が終了するためcountToで生成されたgorutineが使用され続ける。
そのため、メモリを必要以上に消費するなどのデメリットがあるので注意が必要である。

package main

import (
	"fmt"
)

func countTo(max int) <-chan int {
	ch := make(chan int)
	go func() {
		for i := 0; i < max; i++ {
			fmt.Println("countTo: ", i)
			ch <- i
		}
		close(ch)
	}()

	return ch
}

func main() {
	for i := range countTo(10) {
		fmt.Println("main: ", i)
		if i > 5 {
			break
		}
	}
}

結果

countTo:  0
countTo:  1
main:  0
main:  1
countTo:  2
countTo:  3
main:  2
main:  3
countTo:  4
countTo:  5
main:  4
main:  5
countTo:  6
countTo:  7
main:  6

並行処理を使用するタイミング

並行処理をする際はオーバーヘッドーが発生するので、処理が短い場合などは並行処理を使うべきではありません。
また、並行処理を用いるとデバッグが大変になったり、コードの可読性の低下に繋がる場合もあります。
並行処理はネットワークリクエストやディスクへの読み書きなどが起こる場合など、ここぞという時に用いるべきでしょう。

以上、Goの並行処理に関して簡単にまとめてみました。
まだ浅い理解なので今後機会があれば深く勉強してみたいと思います。

参考にしたサイト・動画

https://zenn.dev/hsaki/books/golang-concurrency/viewer/basicusage
https://www.youtube.com/watch?v=_PpJjo2iAZ0&t=688s
https://www.youtube.com/watch?v=buYA-4_JQVQ

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?