Go言語による並行処理について自分なりにポイントをまとめていきます。
15分で読める「Go言語による並行処理」ポイントまとめ・上
15分で読める「Go言語による並行処理」ポイントまとめ・下
※15分 = この記事の文字数(約7100字)/1分で読めるらしい文字数平均(400~600)の大体
並行性の概念
そもそも「並行性」とは
前提の話であるProcess,Thread,そして並行と並列の違いについては以下の記事がわかりやすかったです。
マルチスレッドと並行処理をわかりやすく説明します
- Process = 仕事の単位を表す概念
- Thread = Processより小さい単位であり一つのプロセスにより所有、共通のメモリを参照
- 並行 = コンテキストスイッチにより複数処理を同時に行っているよう見せている
- 並列 = 完全に同時に処理
また書籍内では簡潔な一文で説明されています。
並行性はコードの性質を指し、並列性は動作しているプログラムの性質を指します。
つまり並列に走って欲しいと思う、並行なコードを書いているとのこと。
クラウドコンピューティングで解決すべき問題
・リソースのプロビジョニング
・マシンインスタンスのやりとり
・どのようにコードを並行にするか
並行処理でよく発生する問題
競合状態
・同じ変数の読み込みと書き込みが同時に行われるため発生
・プログラムを直列に(コードが先に書いてあるから先に実行されるだろうと)考えにより発生
アトミック性(不可分性)
・アトミックか否かはコンテキスト(スコープ)に依る
・アトミックである = 平行なコンテキストで安全に扱えることを保証
・コードのどの部分をどれくらいの粒度でアトミックにするかがポイント
メモリアクセス同期
・クリティカルセクション = 共有リソースに対する排他的なアクセスが必要な場所
// 以下3点がクリティカルセクション
var data int
go func() {
data++ // ①dataをインクリメント
}()
if data == 0 { // ②dataが0か確認
fmt.Println("data: 0")
} else {
fmt.Printf("data: %v\n", data) // ③dataの値を出力
}
・クリティカルセクションを守る方法として排他的アクセス権を与える方法があるが、パフォーマンスに悪影響
・また「クリティカルセクションが繰り返されていないか」「クリティカルセクションの粒度はどれくらいにすべきか」といった注意が必要
デッドロック
・全並行プロセスが互いに処理を待ち合っている状態
・デッドロックが発生する条件(Coffman条件)
1. 相互排他 = ある並行プロセスがどの時点でもリソースの排他的権利を保持
2. 条件待ち = リソース保持と追加リソース待ちを同時進行
3. 横取り不可 = そのプロセスによってのみ解放される
4. 循環待ち = 互いに待ち合っている
→ 逆にこれらの最低1つを防ぐことができればデッドロック発生を防げる
ライブロック
・並行操作を行っているがプログラムの状態を全く進めていないもの
・廊下のすれ違い問題(進路を譲ろうとして右にいこうとしたら相手も右にいき左に行ったら相手も左にいき...)のような感じ
リソース枯渇
・クリティカルセクションを超えた共有ロックの拡大は他プロセスの効率を下げる
・リソース枯渇はCPU、メモリ、DB接続など、共有されるあらゆるリソースで起こりうる
並行処理を使うときは..
- 並行処理を使用する際は、コメントで①誰が並行処理を行うか ②問題空間がどう並行処理に関わるか ③誰が同期処理を担うか について触れる。
- 戻り値をチャネルにすることでgorutineを持っており、他のgorutineを用意することがないことを示す。
プリミティブか、チャネルか
Goのモットーのひとつに以下のものがある。
Do not communicate by sharing memory; instead, share memory by communicating.
メモリ共有によって通信しないこと。代わりに、通信によってメモリ共有すること。
つまり、
①変数の変更をチャネルによってgotutineへと通知したのちgorutineで変数を扱ったり
②チャネルを使ってローカル値やポインタを送ることでのみメモリを共有することでデータ競合を起こさないようにしよう
ということ。
しかしGoはsyncパッケージで伝統的なロック機構を提供している → どちらを使う?
gorutineを使うかsyncを使うかの指標
データの所有権を移動するか
・データ所有権の移動 = 処理結果を生成するコードがあり、その結果を別のコードに共有すること
・所有権の移動がある場合、バッファ付きチャネルを作成しインメモリのキューを実装することでProducer(生産者)とConsumer(消費者)を切り離せる構造体の内部の状態を保護しようとしているか
・メモリアクセス同期を使うことでクリティカルセクションをロックする複雑な実装を呼び出し元から隠すことができる複数ロジックを協調させようとしているか
・複数ロジックを強調させたい場合はチャネルを使うことで楽チンパフォーマンスクリティカルセクションか
・まずはプログラムの再設計を
・プロファイルをとり他の箇所よりオーダー数桁遅ければメモリアクセス同期のプリミティブを使用することで負荷がかかってもよりよく動作する
Goにおける並行処理の構成要素
gorutine
・goキーワードを関数呼び出しの前に置くだけで動作!(無名関数でも可)
・Goのランタイムがgorutineの実行時の振る舞いを観察。
・gorutineがブロックしたら自動で一時停止し、ブロックが解放されたら再開する
→ gorutineをある意味割り込み可能、一時中断可能にしているが、割り込めるのはgorutineがブロックした時のみ
・Goがgorutineをホストする機構はM:Nスケジューラという実装になっている
(M:Nスケジューラ ... M個のグリーンスレッド:N個のOSスレッドというふうに対応させる)
(グリーンスレッド ... 言語のランタイムによって管理されるスレッド)
・Goはfork-joinモデルによって並行処理を実装
→ 親プログラムから子プログラムをfork(分岐)させ、親と並行に実行、再びjoin(合流)させる
・クロージャで挙動を確認
var wg sync.WaitGroup
salutation := "hello"
wg.Add(1)
go func() {
defer wg.Done()
salutation = "こんにちは"
}()
wg.Wait()
fmt.Println(salutation) // => "こんにちは"と出力
↑クロージャが変数の値を変更している
= gorutineはそれが作られたアドレス空間と同じ空間で実行している
var wg sync.WaitGroup
// ①出力前にこのループが終わってから、
for _, salutation := range []string{"hello", "你好", "こんにちは"} {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(salutation) // ②出力される
}()
}
wg.Wait()
// 出力結果 =>
// こんにちは
// こんにちは
// こんにちは
↑gorutineは未来の任意のタイミングにスケジューリングされるため、gorutine前にrangeのループが終了し、その後出力が実行される
以下のようにすると
var wg sync.WaitGroup
// salutationのコピーを渡してから次へ
for _, salutation := range []string{"hello", "你好", "こんにちは"} {
wg.Add(1)
go func(salutation string) {
defer wg.Done()
fmt.Println(salutation)
}(salutation) // salutationのコピーを渡す
}
wg.Wait()
// 出力結果 =>
// 你好
// こんにちは
// hello
こうすることでgorutine前にそれぞれの値が割り当てられるようにできる
syncパッケージ
WaitGroup
・ひとまとまりの並行処理について、その結果を気にしない or 他に結果を収集する手段がある場合に有効
・Add()
メソッドでgorutineの起動を伝え、Wait()
メソッドで全てのgorutineが終了するまでメインゴルーチンをブロックし、Done()
メソッドでgorutineの終了を伝える
・Add()
の引数に渡された整数だけカウンターを増やし、Done()
を呼び出すとカウンターを1減らすといったふうに並行処理で安全なカウンターと考えることともできる
Mutex
・相互排他。プログラム内のクリティカルセクションを保護
・UnLockをdeferで行うことでpanicになっても確実に呼び出すことでデッドロックを回避
RWMutex
・読み込み(Read)のみ許可するLockできる
・クリティカルセクションの出入りはコスト高。なので極力クリティカルセクションで消費される時間を最小に
Cond
・CONDision変数として。
・Condは狭い範囲か、カプセル化した型に入れて利用するのが最適
gorutineで処理がしたい時イベントを受け取るために待機がしたい。といった時↓
for conditionTrue() == false {
time.Sleep(1 * time.Millisecond)
}
と無限ループを使う方法がある。
しかし非効率。Condを使ってこう書けば良い↓
// sync.Locker interfaceを満たす型を引数にとるNewCond関数でCondインスタンス作成
c := sync.NewCond(&sync.Mutex{})
c.L.Lock()
for conditionTrue() == false {
c.Wait() // 条件が発生したかどうか通知されるのを待つ。gorutineは一時停止
}
c.L.Unlock()
BroadCast
メソッド(GUIを模したコード)
type Button struct {
Clicked *sync.Cond
}
button := Button{ Clicked: sync.NewCond(&sync.Mutex{}) }
// gorutine上で動作する関数を登録するための関数。
subscribe := func(c *sync.Cond, fn func()) {
// gorutineを実行するまで終了させない
var gorutineRunning sync.WaitGroup
gorutineRunning.Add(1)
go func() {
gorutineRunning.Done()
c.L.Lock()
defer c.L.UnLock()
c.Wait() // broadcast待ち
fn() // 登録した関数を実行
}()
gorutineRunning.Wait()
}
var clickResisterd sync.WaitGroup
clickResisterd.Add(3)
// 3つのハンドラを登録
subscribe(button.Clicked, func() {
fmt.Println("ウインドウ最大化")
clickResisterd.Done()
})
subscribe(button.Clicked, func() {
fmt.Println("ダイアログボックス表示")
clickResisterd.Done()
})
subscribe(button.Clicked, func() {
fmt.Println("マウスのクリック")
clickResisterd.Done()
})
// Clickedという状態に対応するBroadcastを呼び出し、3つのハンドラにボタンのクリックを知らせる
button.Clicked.Broadcast()
clickResisterd.Wait()
Cond
Once
・sync.Once.Do()の処理を1回のみ実行する
Pool
・オブジェクトプールパターンを並行処理で安全な形で実行したもの
→ オブジェクトパターンは以下の記事で理解
Object Poolパターン
・オブジェクトプールを作ることで上限を制限できることやスループット向上などのメリットがある
・Get
メソッドでプール内の使用可能インスタンスの有無を確認、あればそれを返し、なければNew
メンバー変数を呼び出しインスタンスを作成。作業後にPut
メソッドでプールに返して他プロセスが使用できるようにする
・Poolを使う注意点
1. Getでインスタンス取得するときはそのオブジェクトの状態を考えなくて良い実装をする
2. deferで確実にPutをよぶ
3. プール内のオブジェクトはおよそ均質なものにする
チャネル
・プログラムの異なる部分では互いの挙動を知らず、チャネルが存在するメモリ中の同じ場所を参照する
・チャネルはブロックする = キャパいっぱいのチャネルに書き込もうとするgorutineは空きが出るまで待機し、空のチャネルから読み込もうとするgorutineは要素が入るまで待機する。
チャネル生成
var c chan interface{} // interface{}型のチャネルを宣言
c := make(chan interface{}) // make関数で初期化
// 読込専用
var c <-chan interface{}
c := make(<-chan interface{})
// 送信専用
var c chan<- interface{}
c := make(chan<- interface{})
<-演算子からの受信は2つの値を受け取れる(2つめの値は読み込みできたか、あるいは閉じたチャネルから生成されたデフォルト値)
(閉じたチャネル ... closeにより閉じたチャネル。閉じたチャネルからの読込は不可)
c := make(chan string)
go func() {
c <- "hello"
}()
salutation, ok := <-c
fmt.Printf("(%v): %v\n", ok, salutation) // => (true): hello
closeとループ処理を使うことで、チャネルが閉じた時にループ処理を終了させるといったことができる↓
c := make(chan int)
go func() {
defer close(c)
for i := 1; i <= 5; i++ {
c <- i
}
}()
for data := range c {
fmt.Printf("%v", data) // => 12345
}
また、チャネルを閉じることでsync.Condのbroadcastのように複数gorutineにシグナルを送ることができる。
バッファ付きチャネル
・バッファに空きが出るまで送信側のgorutineをブロックする
nilチャネル
・送信、受信、そしてcloseすることもpanicを起こす
・チャネルを扱うときは必ず初期化をする
異なる型のチャネルをどう組み合わせるか?
・チャネルを正しいコンテキストに入れるためにチャネルの所有権(初期化、書き込み、閉じるを行うgorutine)を明確に割り振る
・できるだけチャネルの所有権のスコープを小さくする
select
・Goのランタイムはcase文に対して擬似乱数による一様選択している
・1つも受信できなかった時にタイムアウトさせるやり方がある↓
var c <- chan int
select {
case <-c:
case <-time.After(1 * time.Second):
fmt.Println("タイムアウト")
}
・for-selectループ↓
done := make(chan interface{})
go func() {
time.Sleep(5 * time.Second)
close(done) // doneチャネルをclose
}()
workCount := 0
loop:
for {
select {
case <-done: // doneチャネルがcloseしたときにloopをbreak
break loop
default:
}
workCount++
time.Sleep(1 * time.Second)
}
fmt.Printf("シグナルを渡されるまでに%v個のサイクルを行いました\n", workCount)
// => シグナルを渡されるまでに5個のサイクルを行いました
selectによってループを止めるかどうか確認している
GOMAXPROCS
・「ワークキュー」というOSスレッドの数を制御している
・GOMAXPROCSを調整することで性能を挙げるということは抽象化を犠牲にすること
→コードのコミットごと、マシン変更ごと、Goのバージョン変更ごとに調整が必要
参考
https://blog.golang.org/codelab-share
https://qiita.com/ruiu/items/54f0dbdec0d48082a5b1