LoginSignup
7
3

More than 1 year has passed since last update.

[Go]並行処理・ゴルーチン

Last updated at Posted at 2022-09-06

※親記事
[Go]プログラミングノート

1. 並行処理

Golangで指示できるのは 「並列処理でなく並行処理」 である。

並行処理と並列処理の違い

名称未設定ファイル-並行処理と並列処理.drawio.png

Golangでは ゴルーチン によって並行処理を実装できる。
ゴルーチンを使って、マシンに並列処理してもらうためには、最低でも以下が満たされてなければならない。

  • CPUコア数が2以上であること
  • GOMAXPROCSを2以上に設定していること

この記事を参照。

2. ゴルーチン

概要は以下の通り。

  • Goランタイム(※1)が提供する軽量スレッド(※2)であり、ゴルーチンスケジューラ によって並行処理の制御がなされる。
  • 通常のスレッド(OSが提供するスレッド)(※3)に比べて メモリ使用量 が小さく、スイッチングコスト(※4)を抑えることができる(CPUコア数2以上かつCPUコア数 >= GOMAXPROCSのとき)。
  • Golangのすべての処理はゴルーチンで実行されている(メイン関数ですら)。

※1 ランタイム: アプリケーションの実行機能のみのプログラム、ライブラリ
※2 ゴルーチン(軽量スレッド): OSのスレッドに対してマッピングされる
※3 OSのスレッド: CPUコアに対してマッピングされる
※4 スイッチングコスト: 処理の切り替えにかかるコスト

3. ゴルーチンスケジューラ

Goランライムで実装されている、ゴルーチンを並行に実行するためのスケジューラ。
以下のような感じで動くみたい。

スケジューラの機能

  • M(achine): OSのスレッド。ゴルーチンが実行させるのはここ。
  • P(rocessor): ゴルーチンを貯めるキュー機能と、ゴルーチンをM(OSのスレッド)に割り当てる機能を持つ。Pの数はデフォルトでCPUコア数、「GOMAXPROCS」を設定していた場合はその値になる。
  • G(oroutine): ゴルーチン。

CPUコア数 = GOMAXPROCSのとき
名称未設定ファイル-基本.drawio.png

CPUコア数 < GOMAXPROCSのとき
名称未設定ファイル-CPU_1, P_2.drawio.png

重要なのは以下の点。

  • ゴルーチンスケジューラは、「GOMAXPROCS」の数分、PとMの組を作成し、以降は以下の処理を繰り返す のみ。
    1. GをPn経由でMnに乗っけて、そのMnを担当するCPUに処理してもらう
    2. Gの処理が完了したらキューからまた別のGを引っ張ってMに乗っける
  • 「Pn・Mn・ローカルキューn」間では、「処理中のゴルーチンの切り替え」は発生しない
  • 「GOMAXPROCS」の数が実際のCPUコア数より多い場合は、いずれかのCPUコア数に複数のMが紐づくため、OSスケジューラ側で「処理中のゴルーチンの切り替え」が発生する(=オーバーヘッドが大きい) ことになる
    GOMAXPROCS = CPUコア数 にしておくと、OSスケジューラ側での切り替えを防ぐことができて処理の効率がいいよ! という話。

参考:

4. ゴルーチンを使う

新しいゴルーチンを生成してその上で関数を実行させるコードは以下。

go <関数名>()

動作確認用の例.

package main

import (
	"fmt"
	"time"
)

// 「1秒スリープ → 引数で与えられた文字列を表示」を10回繰り返す
func displayString10Times(str string) {
	for i := 0; i < 10; i++ {
		time.Sleep(1000 * time.Millisecond) // 1秒のスリープ
		fmt.Println(str) // 引数を表示
	}
}

func main() {
	// メインのゴルーチンから新しく生成されたゴルーチンで実行
	go displayString10Times("新ゴルーチン")
	// メインのゴルーチンで実行
	displayString10Times("メインゴルーチン")
}

5. ゴルーチンの待ち合わせ

上の例で、

	// メインのゴルーチンで実行
	displayString10Times("メインゴルーチン")

を削除すると、特になにも表示されずプログラムが終了してしまう(★)

これは、**「特に何も指定していない」**場合、
「メインゴルーチンの処理が終わると、他のゴルーチンの処理進捗を考慮せずプログラムが終了してしまう」
という性質があるから、らしい。

上記の処理を削除した場合は、メインゴルーチンから新しく生成されたゴルーチンでの処理途中で、
メインゴルーチンの処理が終了してしまっているからである。

対象のゴルーチンの処理の終了を待つようにするようにするには、sync.WaitGroupを使う。
(慣習的に無名関数も使って記述する模様)

とりあえず、★を動くようにしたいならば以下のようにすればよい。

package main

import (
	"fmt"
	"time"
	"sync"
)

// 「1秒スリープ → 引数で与えられた文字列を表示」を10回繰り返す
func displayString10Times(str string) {
	for i := 0; i < 10; i++ {
		time.Sleep(1000 * time.Millisecond) // 1秒のスリープ
		fmt.Println(str) // 引数を表示
	}
}

func main() {
	// [前提]
	// sync.WaitGroup.Add(<数値>) : カウンター+<数値>
	// sync.WaitGroup.Done() : カウンター-1
	// sync.WaitGroup.Wait : カウンターが0になったら先にすすめる
	
    // sync.WaitGroup構造体インスタンス生成
	wg := &sync.WaitGroup{}
	
	// sync.WaitGroup構造体のカウンタ+1
	wg.Add(1)
	
    // 「ゴルーチンで実行する対象の処理」と、「カウンタ-1」をゴルーチンで実行する(無名関数利用)
	go func() {
		// メインのゴルーチンから新しく生成されたゴルーチンで実行
		displayString10Times("新ゴルーチン")
		
		// sync.WaitGroup構造体のカウンタ-1
		wg.Done()
	}()
	
	// カウンターが0になるまで先に進ませない
	wg.Wait()
	
	fmt.Println("main関数終了!")
}

同じ処理を指定個数分、並行に実行させたいならば、一般化すると以下のように処理を組めばよさそう。

package main

import (
	"fmt"
	"sync"
)

func main() {
    // sync.WaitGroup構造体インスタンス生成
	wg := &sync.WaitGroup{}

    // 並行処理の数
    processCount := <並行処理させる数>
	
	// sync.WaitGroup構造体のカウンタを<並行処理させる数>にセット
	wg.Add(processCount)
	
    for i := 0; i < processCount; i++ {

      // 「並行で実行したい処理」と、「カウンタ-1」をゴルーチンで実行する(無名関数利用)
	  go func() {
		// <並行で実行したい処理をここに記述>
		
		// sync.WaitGroup構造体のカウンタ-1
		wg.Done()
	  }()
    }
	
	// カウンターが0になるまで先に進ませない
	wg.Wait()
	
	fmt.Println("main関数終了!")
}

試しに実行してみる

package main

import (
	"fmt"
	"time"
	"sync"
	"strconv"
)

// 「1秒スリープ → 引数で与えられた文字列を表示」を10回繰り返す
func displayString10Times(str string) {
	for i := 0; i < 10; i++ {
		time.Sleep(1000 * time.Millisecond) // 1秒のスリープ
		fmt.Println(str) // 引数を表示
	}
}

func main() {
    // sync.WaitGroup構造体インスタンス生成
	wg := &sync.WaitGroup{}
	
	processCount := 10
		
	// sync.WaitGroup構造体のカウンタを<並行処理させる数>にセット
	wg.Add(processCount)
	
    for i := 0; i < processCount; i++ {

      // 「並行で実行したい処理」と、「カウンタ-1」をゴルーチンで実行する(無名関数利用)
	  go func() {
		str := strconv.Itoa(i)
		  
		// <並行で実行したい処理をここに記述>
		displayString10Times(str)
		
		// sync.WaitGroup構造体のカウンタ-1
		wg.Done()
	  }()
    }
	
	// カウンターが0になるまで先に進ませない
	wg.Wait()
	
	fmt.Println("main関数終了!")
    // =>
    // 10
    // 10
    // ...
}

想定では1〜10までの数字が表示されずはずだが10のみ・・・。
この記事に原因がバッチリ書いてあった。

つまり、ゴルーチンが起動してメインのゴルーチンのiを参照するころには、iが上書きされる形で進んじゃってる・・・ということ。

無名関数の引数にメインのゴルーチンのiの値をコピーしてしまうことで防げるとのこと。やってみると

package main

import (
	"fmt"
	"time"
	"sync"
	"strconv"
)

// 「1秒スリープ → 引数で与えられた文字列を表示」を10回繰り返す
func displayString10Times(str string) {
	for i := 0; i < 10; i++ {
		time.Sleep(1000 * time.Millisecond) // 1秒のスリープ
		fmt.Println(str) // 引数を表示
	}
}

func main() {
    // sync.WaitGroup構造体インスタンス生成
	wg := &sync.WaitGroup{}
	
	processCount := 10
		
	// sync.WaitGroup構造体のカウンタを<並行処理させる数>にセット
	wg.Add(processCount)
	
    for i := 0; i < processCount; i++ {

      // 「並行で実行したい処理」と、「カウンタ-1」をゴルーチンで実行する(無名関数利用)
	  go func(argI int) {
		str := strconv.Itoa(argI)
		  
		// <並行で実行したい処理をここに記述>
		displayString10Times(str)
		
		// sync.WaitGroup構造体のカウンタ-1
		wg.Done()
	  }(i) // メインゴルーチンのiの値を引数argIにコピー
    }
	
	// カウンターが0になるまで先に進ませない
	wg.Wait()
	
	fmt.Println("main関数終了!")
    // =>
    // 9
    // 5
    // ...
}

うまくいった!

6. チャネル

ゴルーチン同士でデータを送受信するための機能。

  • チャネルは、以下を定義して生成する必要がある
    • チャネルを介して送受信するデータの型
    • 「バッファなしチャネル」 or 「バッファありチャネル」
  • チャネルを介してのデータ送受信に際しては、データ送信側・受信側のゴルーチンでそれぞれ以下の要求を出す必要がある
    • データ送信側のゴルーチン: チャネルを介したデータ送信要求(【チャネル】 <- 【データ】)
    • データ受信側のゴルーチン: チャネルを介したデータ受信要求(<- 【チャネル】)
  • 「バッファなしチャネルの場合は同期処理」、「バッファありチャネルの場合は非同期処理」 になる(※それぞれの詳細な挙動は後述)

チャネルのベースサンプルコード

base_sample.go
package main

import (
	"fmt"
	
)

func main() {
	// [チャネルの生成] <チャネル変数名> := make(chan <やりとりするデータ型>)
	ch := make(chan int)
	
	// サブゴルーチンでの処理をゴルーチンスケジューラに要求
	go func() {		
		// チャネルを介したデータ送信要求
		ch <- 3
        fmt.Printf("チャネルを介してデータ送信したよ(%v)\n", 3)
	}()
	
	fmt.Println("サブゴルーチンでの処理をゴルーチンスケジューラに要求済み")
	
	// チャネルを介したデータ受信要求
	// (受信がなされると「<- ch」に受信した値が展開されるイメージ)
	receiveInt := <- ch
	fmt.Printf("チャネルを介してデータ受信したよ(%v)\n", receiveInt)
	
	// =>
	// サブゴルーチンでの処理をゴルーチンスケジューラに要求済み
	// チャネルを介してデータ送信したよ(3)
    // チャネルを介してデータ受信したよ(3)
}

6.1 バッファなしチャネル

挙動

  • チャネルを介したデータ送信要求・受信要求がマッチングすると送受信がなされる(同期処理 になる)
  • 送信要求・受信要求がマッチングするまで、ゴルーチンは要求処理の行で待機し続ける(ブロックされる)

名称未設定ファイル-チャネル(バッファなし).drawio (7).png

バッファなしチャネルの挙動確認用サンプルコード

no_buffer_channel.go
package main

import (
	"fmt"
	"time"
)

// 「ゴルーチン2」
func main() {
	// [バッファなしチャネルの生成]
	// <チャネル変数名> := make(chan <やりとりするデータ型>)
	ch := make(chan int)
	
	// 「ゴルーチン1」
	go func() {
		// [その他処理] 送信側ゴルーチンで2秒待たす
		time.Sleep(time.Second * 10)
		
		// [送信要求]
		ch <- 3
		
		// [送信]
		
		// [その他処理]
        fmt.Printf("チャネルを介してデータ送信したよ(%v)\n", 3)
	}()
	
	fmt.Println("ゴルーチンスケジューラにゴルーチン1の処理をすることを要求済み")
	
	// [その他処理] 受信側ゴルーチンで10秒待たす
	time.Sleep(time.Second * 10)

	// [受信要求]
	receiveInt := <- ch // 受信がなされると「<- ch」に受信した値が展開されるイメージ
	
	// [受信]
	
	// [その他処理]
    fmt.Printf("チャネルを介してデータ受信したよ(%v)\n", receiveInt)
	
	// ゴルーチン2(=メインゴルーチン)の処理が終わって
	// ゴルーチン1が強制終了されるのを防ぐため十分待ってあげる
	time.Sleep(time.Second * 10)
	
	// [出力結果]
	// ゴルーチンスケジューラにゴルーチン1の処理をすることを要求済み
    // →10秒ほど待った後に以下が出力される。受信要求が上がるまで、
	//   ゴルーチン1の処理は「送信要求」以降進めないようブロックされていることがわかる
	// チャネルを介してデータ送信したよ(3)
    // チャネルを介してデータ受信したよ(3)
}

6.2. バッファありチャネル

バッファありチャネルはFIFOなキューとして機能する(非同期処理 になる)

挙動

データ送信側のゴルーチンの挙動

  • チャネルのキューに余裕がある場合、送信要求を出したらすぐにキューにデータを積むことができ、すぐに次の処理に移ることができる
  • チャネルのキューに余裕がない場合、キューに余裕ができるまで送信要求処理の行で待機し続ける(ブロックされる)

名称未設定ファイル-チャネル(バッファなし)のコピー.drawio.png

データ受信側のゴルーチンの挙動

  • チャネルのキューにデータが存在する場合、受信要求を出したらすぐにキューからデータを取り出すことができ、すぐに次の処理に移ることができる
  • チャネルのキューにデータが存在しない場合、キューにデータが積まれるまで受信要求処理の行で待機し続ける(ブロックされる)

バッファありチャネルの動作確認用コード

buffered_channel.go
package main

import (
	"fmt"
	"time"
)

// 「ゴルーチン2」
func main() {
	// [バッファありチャネルの生成]
	// <チャネル変数名> := make(chan <やりとりするデータ型>, <バッファサイズ>)
	ch := make(chan int, 1)
	
	// 「ゴルーチン1」
	go func() {
		// [その他処理] 送信側ゴルーチンで2秒待たす
		time.Sleep(time.Second * 2)
		
		// [送信要求]
		ch <- 3
		
		// [送信]
        fmt.Printf("チャネルを介してデータ送信したよ(%v)\n", 3)
		
	    // [送信要求]
		ch <- 4
		
		// [送信]
		
		// [その他処理]
        fmt.Printf("チャネルを介してデータ送信したよ(%v)\n", 4)
	}()
		
	fmt.Println("ゴルーチンスケジューラにゴルーチン1の処理をすることを要求済み")
	
	// [その他処理] 受信側ゴルーチンで10秒待たす
	time.Sleep(time.Second * 10)

	// [データ受信要求] 
	receiveInt := <- ch // 受信がなされると「<- ch」に受信した値が展開されるイメージ
	
	// [受信]
	
	// [その他処理]
    fmt.Printf("チャネルを介してデータ受信したよ(%v)\n", receiveInt)
	
	// ゴルーチン2(=メインゴルーチン)の処理が終わって
	// ゴルーチン1が強制終了されるのを防ぐため十分待ってあげる
	time.Sleep(time.Second * 10)
		
	// [出力結果]
	// ゴルーチンスケジューラにゴルーチン1の処理をすることを要求済み
    // →2秒ほど待った後に以下が出力される
	// チャネルを介してデータ送信したよ(3)
	// →10秒ほど待った後に以下が一気に出力される(バッファに余裕ができるまで、チャネルにデータを積めないことがわかる)
    // チャネルを介してデータ受信したよ(3)
	// チャネルを介してデータ送信したよ(4)
}

6.3. 1ゴルーチンで複数のチャネルからのデータを受信、順次処理

参考:

https://tech-up.hatenablog.com/entry/2018/12/03/183327 より

サンプルコード

no_buffer_channel_for_select_pattern.go
package main

import (
	"fmt"
	"time"
)

// ゴルーチン3
func main() {
	// [バッファなしチャネルの生成]
	// <チャネル変数名> := make(chan <やりとりするデータ型>)
	ch1 := make(chan int)
    ch2 := make(chan int)
	
	// ゴルーチン1
	go func() {
		// [その他処理] 送信側ゴルーチンで2秒待たす
		time.Sleep(time.Second * 2)
		
		// [送信要求]
		ch1 <- 3
		
		// [送信]
		
		// [その他処理]
		fmt.Printf("[ゴルーチン1]チャネルを介してデータ送信したよ(%v)\n", 3)
	}()

    // ゴルーチン2
	go func() {
		// [その他処理] 送信側ゴルーチンで2秒待たす
		time.Sleep(time.Second * 2)
		
		// [送信要求]
		ch2 <- 6
		
		// [送信]
		
		// [その他処理]
        fmt.Printf("[ゴルーチン2]チャネルを介してデータ送信したよ(%v)\n", 6)
	}()
	
	fmt.Println("ゴルーチンスケジューラにゴルーチン1,2の処理をすることを要求済み")
	
	// [その他処理] 受信側ゴルーチンで5秒待たす
	time.Sleep(time.Second * 5)

	// ※for-selectパターン
	// (送信要求が絶対に2回である前提なので、
	//  受信要求も2回で十分ということでループ回数を2回にしている
	//  実際には事前に送信要求の回数がわかっているケースはあまりなさそう)
	for i:= 0; i < 2; i++ {
        // ※select-case: xxxxxxxxxxxxx
        select {
		case v1 := <-ch1: // [受信要求(ch1)]
		    // [受信]

    	    // [その他処理]
        	fmt.Printf("[ゴルーチン3]チャネルを介してデータ受信したよ(%v)\n", v1)
		case v2 := <-ch2: // [受信要求(ch2)]
        	// [受信]

            // [その他処理]
        	fmt.Printf("[ゴルーチン3]チャネルを介してデータ受信したよ(%v)\n", v2)
    	}
	}
	
	// ゴルーチン3(=メインゴルーチン)の処理が終わって
	// ゴルーチン1, 2の処理が最後まで終わらぬまま強制終了されるのを防ぐため十分待ってあげてる
	// 実際には待たないと思うので本来必要ではない処理。そもそも待ち方が安全ではない
	time.Sleep(time.Second * 10)
	
	// [出力結果]
	// ゴルーチンスケジューラにゴルーチン1,2の処理をすることを要求済み
	// →5秒ほど待った後に以下が出力される(順不同)。受信要求が上がるまで、
	//   ゴルーチン1,2の処理は「送信要求」以降進めないようブロックされていることがわかる
    // [ゴルーチン1]チャネルを介してデータ送信したよ(3)
    // [ゴルーチン2]チャネルを介してデータ送信したよ(6)
	// [ゴルーチン3]チャネルを介してデータ受信したよ(3)
    // [ゴルーチン3]チャネルを介してデータ受信したよ(6)
}

サンプルコードの動作イメージ

名称未設定ファイル-チャネル(for-select).drawio.png

6.4. その他参考になりそうな実装パターン

(具体的なユースケースがわかってないので、実務で使いそうな課題にぶち当たってから考える)

参考

ゴルーチン関連

わかりやすい・・・

今後に向けたメモ

  • もしかして、1スレッド内のローカルキューにたまっているゴルーチン間でもスイッチングがそのスレッド内で発生したりする・・・?だとすると結構解釈間違ってるかも
  • 具体的なユースケースと適用すべきパターンをまとめたい
7
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
3