0
0

More than 1 year has passed since last update.

[Go]並行処理

Posted at

概要

  • 前回の続き
  • 今回は並行処理について学習
    • Goの並行処理はシンプルとのこと
    • ゴルーチン、チャネル、select等の新しい用語が出てきた

参考

並行処理について

  • Goの並行性はスレッドのロックによるデータ共有ではない
  • Goの並行性のモデルはCSP(Communicating Sequential Processes)
    • 独立したプロセス群がメッセージパッシングを使用して相互にやり取りをする仕組みらしい

  • 並行性と並列性は異なる
    • 並行性はコードの性質を指し、並列性は動作するプログラムを指す
    • 並行処理を実装しても、必ずしも並列で動作するとは限らない
    • また、並列処理にはオーバヘッドが発生することが多いため、短時間の処理は並行処理より逐次処理のほうが早かったりする
    • したがって、並行処理を実装する前に、ベンチマークなどでパフォーマンスを検証したほうがよい。

ゴルーチン

  • Goの並行性モデルの中核となる概念
  • ゴルーチンはスレッドのようなものだが、スレッドよりも軽量。
  • OSではなくGoランタイムが起動する。
    • プロセス:プログラムの実行単位。OSによって実行される
    • スレッド:プロセスの構成単位。ひとつのプロセスは一つ以上のスレッドからなる。
    • プロセス > スレッド
  • 関数の前にgoをつけることでゴルーチンとなる
  • ゴルーチンはビジネスロジックをラップするクロージャとともに起動するのが一般的
    • クロージャが並行性に関するブックキーピングとなる(ビジネスロジックやAPIは、並行性から分離できる)
ゴルーチン
// クロージャがチャネルから読み込んだ値をビジネスロジックにわたす。
func runThingsConcureently(chIn <- chan int, chOut chan <- string) {
	for val := range chIn {
		go func(val int) { // ゴルーチンの起動
            //ビジネスロジックはゴルーチンの中で実行されるのを意識しない
			result := doBusinessLogic(val) 
			resutString := fmt.Sprintf("%d -> %d", val, result)
			chOut <- resutString
		}(val)
	}
}

チャネル

  • ゴルーチンで情報のやり取りをするためにチャネルを用いる
  • チャネルの作成はmake関数を使用し、キーワードchanとやり取りする型を指定する
    • ch := make(chan int)
  • マップと同様に参照型。ゼロ値はnil
  • チャネルでのやり取り(読み書き)は、<-演算子を使用する
チャネルの読み書き
a := <-ch // チャネルからの読み込み。チャネルchから値を読み込み、aに代入
ch <- b // チャネルへの書き込み。bの値をチャネルchに書き込む
  • for-rangeループでチャネルの読み込みが行える。通常のfor-rangeと異なり、一つの変数に値が入る。
  • チャネルがクローズかbreakされるまでループする
for-renge
for v := ch {
    fmt.println(v)
}
  • チャネルのクローズはclose(ch)で行う。
    • クローズ後のチャネルへの書き込み、クロースの再実施はパニックになる。
    • 読み込みは、バッファが読まれていない場合は値を取得できる
    • クローズされているかもしれない場合の読み込みは、カンマOkイディオムでクローズ確認を行うのが安全。
      • v, ok := <- ch //オープンならokにはtrueが入る。
  • 基本的にはチャネルのクローズは書き込みを行うゴルーチンが行う
    • 以下の場合は、例外でsync.WaitGroupを用いて対応する。
    • 複数のゴルーチンが同じチャネルに書き込む際(クローズの再実施でパニックになる。)
    • 書き込みを行おうとした際、他のゴルーチンがチャネルをクローズしていた時(クローズ後のチャネルの書き込みでパニックになる)

select

  • 並行性の制御を行う。
  • 複数のチャネルに対する読み込み、書き込み操作が可能となる
    • その際の優先順位はランダムとなる。
    • そのため、スタベーション(特定の処理が行われない)状態を解決できる

selectの構文

  • ブランクswitchと似た構文
select
select {
case v := <-ch1: // ch1チャネル読み込み
    fmt.Println("ch1:",v) 
case v := <-ch2: // ch2チャネル読み込み
    fmt.Println("ch2:",v) 
case ch3 <- x: // ch3チャネルへ書き込み
    fmt.Println("ch3:",x) 
}
  • 上記の例では、実行可能な場合、どのcaseブロックが処理されるかはランダムとなる。(switch文の場合は上からの優先順位でチェックされる。)
    • ランダムであることでスタベーションが回避できる

デッドロックの回避

  • また、デッドロックも回避できる
デッドロックの回避
func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	// ゴルーチン
	go func() {
		v := 1
		ch1 <- v // v(1)をch1に書き込み
		v2 := ch2
		fmt.Print("無名関数内: ",v, " ", v2, "\n")
	}()

	v := 2
	var v2 int
	// select(デッドロックの回避。チャネルのやり取りをselectで囲む)
	select {
	case ch2 <- v: // こちらはまだ書き込めない
	case v2 = <-ch1: //v(1)がch1に入れば実行される。v2は2になる
	}
	fmt.Print("mainの最後:", v, " ",v2, "\n") // 2 1
}
  • 無名関数内のch1 <- vでch1に1が入ると、main関数内のcase文の v2 = <-ch1:が実行され、v2は1になる。その後、main関数内でvに2が入る。
  • よって、v=2, v2=1となる

for-selectループ

  • selectは複数のチャネルの中から前に進めるものを選択してくれるため、forループの中に埋め込まれるパターンが多い。for-selectループと呼ぶ。
  • for-selectは、ループを抜ける方法の定義が必須(doneチャネル)
for-select
for {
    select {
    case <- done: // forを抜けるcase文が必要
        return
    case v := <-ch:
        fmt.Println("chチャネル読み込み:",v)
    }
}
  • また、select文はdefaultを持つことができる。待ちをしない場合に定義する。

並行処理のベストプラクティス

  • APIに並行性を持たせない。
    • 公開するAPIにはチャネル(及びミューテックス)を含めないこと
    • 利用者にチャネル管理の責任を負わせないため

ゴルーチンとforループ

  • forループ内のゴルーチンが外側のループの値を使用すると、どのループ時点での値かが不定となる。
  • そのため、実行の度に異なる結果となる
    ※forループに限らず、外側の変数を使用する際は発生する可能性がある。
forループ内
	a := []int{2,4,6,8,10}
	ch := make(chan int, len(a))
	for _, v := range a {
		// ゴルーチン起動(mainとは別スレッド)
		go func ()  {
			ch <- v * 2 // ゴルーチン内のvの値は、外側のvの値と一致しない。	
		}()
	}
	for i := 0; i < len(a); i++ {
		fmt.Print(<-ch, " ")
	}
  • 上記の結果は不定となる(各配列の値を2倍した数にならない)
  • ゴルーチン起動時に外側変数の値を渡すには以下の2通りがある
    • ループ内でシャドーイングする
    • ゴルーチン関数の引数に渡す
    • ※なお、上記の場合も、出力される値は順不同(配列の順序通りではない)
forループ内(外側変数の正しい使用:シャドーイング)
	for _, v := range b {
		v := v // シャドーイングする
		go func ()  {
			ch2 <- v * 2
		}()
	}
forループ内(外側変数の正しい使用:ゴルーチン関数の引数)
	for _, v := range b {
		go func (val int)  { // 外側変数の値(v)を引数valで受け取る
			ch2 <- val * 2
		}(v) //無名関数の引数に外側変数vを渡す
	}

ゴルーチンの終了チェック

  • ゴルーチンの関数は確実に終了するようにしないといけない。
    • ランタイムは自動で終了してくれず、定期的に時間を割り振る、ゴルーチンリークが発生するため

doneチャネルパターン

  • ゴルーチンに終了するシグナルを送るチャネルを作成する
doneチャネル
// 文字列sに関して、convertersに入っている関数を並行に実行して、もっとも早く終了した結果を返す
// 第1引数sは対象の文字列
// 第2引数convertersは「『文字列を受け取って、文字列を返す関数』を要素としてもつスライス」
// 戻り値は文字列
func convertData(s string, converters []func(string) message) message {
	done := make(chan struct{}) //空の構造体チャネル
	resultChan := make(chan message)
	for _, f := range converters {
		go func(f func(string) message) {
			r := f(s)
			select{
			case resultChan <- r:
				fmt.Printf("結果が戻ってきたのでresultChanに入れたあと: %v\n", r)
			case <- done:
					fmt.Println("case<-done選択", r.fromFunc)
			}
			fmt.Println("無名関数終了",r)
		}(f)
	}
	r := resultChan // 結果が返ってきたら
	close(done)	// チャネルをクローズ
	return r
}
  • select文の各caseで、resultチャネルへの書き込みか、doneチャネルへの書き込みを待つ
  • 処理の最後に、resultに書き込まれた値を読み込み、doneチャネルをcloseしている

selectのcase無効化

  • case文のチャネルがクローズされた際、チャネル読み込みは成功するが、常にゼロ値が返る。
    • これは無駄な処理であるため、以降はスキップするよう対応が必要。
  • チャネルがクローズされたら、チャネルにnilを設定することで、以降該当のcaseをスキップできる
case無効化
for {
	select {
	case v, ok := <-in:
		if !ok {
			in = nil // このcaseは再度成功することはない
			continue
		}
	case v, ok := in <-in2:
		if !ok {
			in2 = nil // このcaseは再度成功することはない
			continue
		}
	case <- done:
			return
	}
}

タイムアウト

  • Goの並行処理ではリクエストの実行時間を管理できる。(jsのpromise等が相当する)
  • タイムアウトのイディオムを使うことで実現できる
タイムアウト
func timeLimit()(int, error) {
	var result int
	var err error
	done := make(chan struct{})
	go func() {
		result, err = doSomeWork() // APIの実施
		close(done)
	}()

	select {
	case <-done: // doneが閉じれたらOK(APIの実施が2秒以内)
		return result, err
	case <-time.After(2 * time.Second): //APIの実施が2秒かかったらこちら
		return 0, errors.New("タイムアウト")
	}
}

sync.WaitGroup

  • 複数のゴルーチンの終了を待つ場合にsync.WaitGroup使用する
  • 3つのメソッドがある
    • Add
      • 終了を待つゴルーチンのカウンタを設定する。
      • 通常一度だけ、起動するゴルーチンの数を指定する。
    • Done
      • 処理を終了した際に呼ぶ。Addで設定したカウンタをデクリメントする
      • ゴルーチン内で呼ばれ、確実に実行されるようにdeferを使う
    • Wait : カウンタが0になるまで、ポーズする
sync.WaitGroup
// スライスdataを受け取り、各要素をprocessor関数で処理した結果を集めて戻す。
// 処理結果の順番は不明
func processAndGather(processor func(int) int, data []int)[]int {
	num := len(data)
	chResult := make(chan int, num) // 処理結果を受け取るチャネル。
	var wg sync.WaitGroup // 1.waitGroupの定義
	wg.Add(num) // 2.Add:dataと同じ長さのゴルーチン数を設定

    // ゴルーチンをdataの長さ分起動
	for _, v := range data {
		go func(v int) {
			defer wg.Done() // 3.Done:処理が終わったらwgをデクリメント
			chResult <- processor(v) //関数処理結果をチャネルに入れる
		}(v)
	}

	wg.Wait() // 4: 全てのゴルーチンの終了を待つ
	close(chResult) // 全て終了されたらチャネルをクローズする

	//最後にチャネルの処理結果を返す
	var result []int
	for v := range chResult {
		result = append(result, v)
	}
	return result
}
  • その他、必要なときに一度だけ処理を実行できる、sync.onceがある
    • 時間のかかる初期化処理を遅延読み込みしたい場合などに用いる

おわりに

  • 並行処理は難易度が高く感じた。(それでも他言語に比べシンプルなようだが。)
  • ゴルーチン、チャネル、selectによる並行処理の基本は押さえることができた
  • 実践的な内容になると、理解が追いつかなくなってきた。
    • サンプルコードの内容も長なってきたから。
    • 複数ゴルーチンの制御、ゴルーチンの終了と、チャネルのクローズなど、覚える内容が増えてきて混乱した。
  • 実際に自分で並行処理を書く機会になったら、理解も深まるでしょう!
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0