概要
- 前回の続き
- 今回は並行処理について学習
- Goの並行処理はシンプルとのこと
- ゴルーチン、チャネル、select等の新しい用語が出てきた
参考
並行処理について
- Goの並行性はスレッドのロックによるデータ共有ではない
- Goの並行性のモデルはCSP(Communicating Sequential Processes)
- 独立したプロセス群がメッセージパッシングを使用して相互にやり取りをする仕組みらしい
- 並行性と並列性は異なる
- 並行性はコードの性質を指し、並列性は動作するプログラムを指す
- 並行処理を実装しても、必ずしも並列で動作するとは限らない
- また、並列処理にはオーバヘッドが発生することが多いため、短時間の処理は並行処理より逐次処理のほうが早かったりする
- したがって、並行処理を実装する前に、ベンチマークなどでパフォーマンスを検証したほうがよい。
ゴルーチン
- Goの並行性モデルの中核となる概念
- ゴルーチンはスレッドのようなものだが、スレッドよりも軽量。
- OSではなくGoランタイムが起動する。
- プロセス:プログラムの実行単位。OSによって実行される
- スレッド:プロセスの構成単位。ひとつのプロセスは一つ以上のスレッドからなる。
- プロセス > スレッド
- 関数の前に
go
をつけることでゴルーチンとなる - ゴルーチンはビジネスロジックをラップするクロージャとともに起動するのが一般的
- クロージャが並行性に関するブックキーピングとなる(ビジネスロジックやAPIは、並行性から分離できる)
ゴルーチン
// クロージャがチャネルから読み込んだ値をビジネスロジックにわたす。
func runThingsConcureently(chIn <- chan int, chOut chan <- string) {
for val := range chIn {
go func(val int) { // ゴルーチンの起動
//ビジネスロジックはゴルーチンの中で実行されるのを意識しない
result := doBusinessLogic(val)
resutString := fmt.Sprintf("%d -> %d", val, result)
chOut <- resutString
}(val)
}
}
チャネル
- ゴルーチンで情報のやり取りをするためにチャネルを用いる
- チャネルの作成は
make
関数を使用し、キーワードchan
とやり取りする型を指定するch := make(chan int)
- マップと同様に参照型。ゼロ値はnil
- チャネルでのやり取り(読み書き)は、
<-
演算子を使用する
チャネルの読み書き
a := <-ch // チャネルからの読み込み。チャネルchから値を読み込み、aに代入
ch <- b // チャネルへの書き込み。bの値をチャネルchに書き込む
-
for-range
ループでチャネルの読み込みが行える。通常のfor-rangeと異なり、一つの変数に値が入る。 - チャネルがクローズかbreakされるまでループする
for-renge
for v := ch {
fmt.println(v)
}
- チャネルのクローズは
close(ch)
で行う。- クローズ後のチャネルへの書き込み、クロースの再実施はパニックになる。
- 読み込みは、バッファが読まれていない場合は値を取得できる
- クローズされているかもしれない場合の読み込みは、カンマOkイディオムでクローズ確認を行うのが安全。
v, ok := <- ch //オープンならokにはtrueが入る。
- 基本的にはチャネルのクローズは書き込みを行うゴルーチンが行う
- 以下の場合は、例外で
sync.WaitGroup
を用いて対応する。 - 複数のゴルーチンが同じチャネルに書き込む際(クローズの再実施でパニックになる。)
- 書き込みを行おうとした際、他のゴルーチンがチャネルをクローズしていた時(クローズ後のチャネルの書き込みでパニックになる)
- 以下の場合は、例外で
select
- 並行性の制御を行う。
- 複数のチャネルに対する読み込み、書き込み操作が可能となる
- その際の優先順位はランダムとなる。
- そのため、スタベーション(特定の処理が行われない)状態を解決できる
selectの構文
- ブランクswitchと似た構文
select
select {
case v := <-ch1: // ch1チャネル読み込み
fmt.Println("ch1:",v)
case v := <-ch2: // ch2チャネル読み込み
fmt.Println("ch2:",v)
case ch3 <- x: // ch3チャネルへ書き込み
fmt.Println("ch3:",x)
}
- 上記の例では、実行可能な場合、どのcaseブロックが処理されるかはランダムとなる。(switch文の場合は上からの優先順位でチェックされる。)
- ランダムであることでスタベーションが回避できる
デッドロックの回避
- また、デッドロックも回避できる
デッドロックの回避
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
// ゴルーチン
go func() {
v := 1
ch1 <- v // v(1)をch1に書き込み
v2 := ch2
fmt.Print("無名関数内: ",v, " ", v2, "\n")
}()
v := 2
var v2 int
// select(デッドロックの回避。チャネルのやり取りをselectで囲む)
select {
case ch2 <- v: // こちらはまだ書き込めない
case v2 = <-ch1: //v(1)がch1に入れば実行される。v2は2になる
}
fmt.Print("mainの最後:", v, " ",v2, "\n") // 2 1
}
- 無名関数内の
ch1 <- v
でch1に1が入ると、main関数内のcase文のv2 = <-ch1:
が実行され、v2は1になる。その後、main関数内でvに2が入る。 - よって、
v=2, v2=1
となる
for-selectループ
- selectは複数のチャネルの中から前に進めるものを選択してくれるため、forループの中に埋め込まれるパターンが多い。for-selectループと呼ぶ。
- for-selectは、ループを抜ける方法の定義が必須(doneチャネル)
for-select
for {
select {
case <- done: // forを抜けるcase文が必要
return
case v := <-ch:
fmt.Println("chチャネル読み込み:",v)
}
}
- また、select文はdefaultを持つことができる。待ちをしない場合に定義する。
並行処理のベストプラクティス
- APIに並行性を持たせない。
- 公開するAPIにはチャネル(及びミューテックス)を含めないこと
- 利用者にチャネル管理の責任を負わせないため
ゴルーチンとforループ
- forループ内のゴルーチンが外側のループの値を使用すると、どのループ時点での値かが不定となる。
- そのため、実行の度に異なる結果となる
※forループに限らず、外側の変数を使用する際は発生する可能性がある。
forループ内
a := []int{2,4,6,8,10}
ch := make(chan int, len(a))
for _, v := range a {
// ゴルーチン起動(mainとは別スレッド)
go func () {
ch <- v * 2 // ゴルーチン内のvの値は、外側のvの値と一致しない。
}()
}
for i := 0; i < len(a); i++ {
fmt.Print(<-ch, " ")
}
- 上記の結果は不定となる(各配列の値を2倍した数にならない)
- ゴルーチン起動時に外側変数の値を渡すには以下の2通りがある
- ループ内でシャドーイングする
- ゴルーチン関数の引数に渡す
- ※なお、上記の場合も、出力される値は順不同(配列の順序通りではない)
forループ内(外側変数の正しい使用:シャドーイング)
for _, v := range b {
v := v // シャドーイングする
go func () {
ch2 <- v * 2
}()
}
forループ内(外側変数の正しい使用:ゴルーチン関数の引数)
for _, v := range b {
go func (val int) { // 外側変数の値(v)を引数valで受け取る
ch2 <- val * 2
}(v) //無名関数の引数に外側変数vを渡す
}
ゴルーチンの終了チェック
- ゴルーチンの関数は確実に終了するようにしないといけない。
- ランタイムは自動で終了してくれず、定期的に時間を割り振る、
ゴルーチンリーク
が発生するため
- ランタイムは自動で終了してくれず、定期的に時間を割り振る、
doneチャネルパターン
- ゴルーチンに終了するシグナルを送るチャネルを作成する
doneチャネル
// 文字列sに関して、convertersに入っている関数を並行に実行して、もっとも早く終了した結果を返す
// 第1引数sは対象の文字列
// 第2引数convertersは「『文字列を受け取って、文字列を返す関数』を要素としてもつスライス」
// 戻り値は文字列
func convertData(s string, converters []func(string) message) message {
done := make(chan struct{}) //空の構造体チャネル
resultChan := make(chan message)
for _, f := range converters {
go func(f func(string) message) {
r := f(s)
select{
case resultChan <- r:
fmt.Printf("結果が戻ってきたのでresultChanに入れたあと: %v\n", r)
case <- done:
fmt.Println("case<-done選択", r.fromFunc)
}
fmt.Println("無名関数終了",r)
}(f)
}
r := resultChan // 結果が返ってきたら
close(done) // チャネルをクローズ
return r
}
- select文の各caseで、resultチャネルへの書き込みか、doneチャネルへの書き込みを待つ
- 処理の最後に、resultに書き込まれた値を読み込み、doneチャネルをcloseしている
selectのcase無効化
- case文のチャネルがクローズされた際、チャネル読み込みは成功するが、常にゼロ値が返る。
- これは無駄な処理であるため、以降はスキップするよう対応が必要。
- チャネルがクローズされたら、チャネルにnilを設定することで、以降該当のcaseをスキップできる
case無効化
for {
select {
case v, ok := <-in:
if !ok {
in = nil // このcaseは再度成功することはない
continue
}
case v, ok := in <-in2:
if !ok {
in2 = nil // このcaseは再度成功することはない
continue
}
case <- done:
return
}
}
タイムアウト
- Goの並行処理ではリクエストの実行時間を管理できる。(jsのpromise等が相当する)
- タイムアウトのイディオムを使うことで実現できる
タイムアウト
func timeLimit()(int, error) {
var result int
var err error
done := make(chan struct{})
go func() {
result, err = doSomeWork() // APIの実施
close(done)
}()
select {
case <-done: // doneが閉じれたらOK(APIの実施が2秒以内)
return result, err
case <-time.After(2 * time.Second): //APIの実施が2秒かかったらこちら
return 0, errors.New("タイムアウト")
}
}
sync.WaitGroup
- 複数のゴルーチンの終了を待つ場合に
sync.WaitGroup
使用する - 3つのメソッドがある
- Add
- 終了を待つゴルーチンのカウンタを設定する。
- 通常一度だけ、起動するゴルーチンの数を指定する。
- Done
- 処理を終了した際に呼ぶ。Addで設定したカウンタをデクリメントする
- ゴルーチン内で呼ばれ、確実に実行されるようにdeferを使う
- Wait : カウンタが0になるまで、ポーズする
- Add
sync.WaitGroup
// スライスdataを受け取り、各要素をprocessor関数で処理した結果を集めて戻す。
// 処理結果の順番は不明
func processAndGather(processor func(int) int, data []int)[]int {
num := len(data)
chResult := make(chan int, num) // 処理結果を受け取るチャネル。
var wg sync.WaitGroup // 1.waitGroupの定義
wg.Add(num) // 2.Add:dataと同じ長さのゴルーチン数を設定
// ゴルーチンをdataの長さ分起動
for _, v := range data {
go func(v int) {
defer wg.Done() // 3.Done:処理が終わったらwgをデクリメント
chResult <- processor(v) //関数処理結果をチャネルに入れる
}(v)
}
wg.Wait() // 4: 全てのゴルーチンの終了を待つ
close(chResult) // 全て終了されたらチャネルをクローズする
//最後にチャネルの処理結果を返す
var result []int
for v := range chResult {
result = append(result, v)
}
return result
}
- その他、必要なときに一度だけ処理を実行できる、
sync.once
がある- 時間のかかる初期化処理を遅延読み込みしたい場合などに用いる
おわりに
- 並行処理は難易度が高く感じた。(それでも他言語に比べシンプルなようだが。)
- ゴルーチン、チャネル、selectによる並行処理の基本は押さえることができた
- 実践的な内容になると、理解が追いつかなくなってきた。
- サンプルコードの内容も長なってきたから。
- 複数ゴルーチンの制御、ゴルーチンの終了と、チャネルのクローズなど、覚える内容が増えてきて混乱した。
- 実際に自分で並行処理を書く機会になったら、理解も深まるでしょう!