ゴルーチンとは
Go言語において非常に重要な概念となるゴルーチン。しかしゴルーチンとはそもそもなんだろうという人もいるかもしれません。
そこではじめに、ゴルーチンとは何かという基礎を解説します。
並行処理
並行処理とは、複数の処理をあたかも同時にこなしているかのように見せかける処理のことを指します。もしかしたら本当に同時に実行しているかもしれないし、交互に切り替えているかもしれません。また、これと類似した用語に並列処理がありますが、これは本当に同時に処理をしています。
ゴルーチン
さて、並行処理の話をしたところで、ゴルーチンの話に戻りましょう。
ゴルーチンとは並行処理における処理の単位と思ってください。例えば以下のようなコードがあったとします。
func a() {
// 処理
}
func b() {
// 処理
}
func main() {
go a() // ゴルーチン
go b() // ゴルーチン
}
この場合、関数 a() と b() はゴルーチンとなります。ゴルーチンの生成は以下のように書きます。
go [関数名]()
また、よくある書き方として、上記の関数を無名関数として実装するケースがあります。
go func() {
// 何かしらの処理
}()
ゴルーチンを書いてみよう!
では早速ゴルーチンを書いてみましょう!
まずは簡単に、ゴルーチン内で標準出力するプログラムを書いてみます。
package main
import "fmt"
func main() {
fmt.Println("hello!")
go func() {
fmt.Println("world!")
}()
fmt.Println("hogehoge!")
}
この結果はどうなるでしょうか?
hello!
hogehoge!
あれ? world! が出力されませんでしたね。
なぜでしょうか?
main関数は別名 メインゴルーチンと呼ばれていますが、このゴルーチンが終了したら他のゴルーチンも強制的に終了となります。
ではちゃんとゴルーチン内の処理を実行し、出力させるためにはどうすればいいのでしょうか?
一番手っ取り早いのは time.Sleep関数を利用することです。
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("hello!")
go func() {
fmt.Println("world!")
}()
time.Sleep(1 * time.Second)
fmt.Println("hogehoge!")
}
この結果は以下のとおりです。
hello!
world!
hogehoge!
はい、期待通りになりましたね!
値を共有してみよう!
複数のゴルーチン間で値を共有するケースを考えてみましょう。
愚直に書けば以下のようなコードになります。
package main
import (
"fmt"
"time"
)
func spender(money *int) {
*money--
}
func worker(money *int) {
*money++
}
func main() {
money := 50
count := 100000
for range count {
worker(&money)
spender(&money)
}
fmt.Printf("money: %d\n", money)
}
この例ではポインターを用いて値の加減算を行っています。こちらの方が、値が共有されているというのが明確になると思ったからです。
結局のところ、 spenderは money を-1、 workerは+1しているだけなので、最終的に出力される money の数は 50になるはずです。実際、上のコードは順次実行なので正しい値になります。
money: 50
ではこれをゴルーチンにしてみましょう。
package main
import (
"fmt"
"time"
)
func spender(money *int) {
*money--
}
func worker(money *int) {
*money++
}
func main() {
money := 50
count := 100000
for range count {
go worker(&money) // goキーワードを追加してゴルーチン化
go spender(&money)
}
time.Sleep(2 * time.Second)
fmt.Printf("money: %d\n", money)
}
この出力、どうなるでしょうか??
money: 205
あれ?なぜか増えてしまっています。
money: -54
と思ったら逆に減っている....!?
なぜこの現象が起こるのか
この現象に関わってくるのが アトミック操作 という用語です。これ、何かというと、これ以上分割しきれない操作のことを指します。
*money-- や *money++は、メモリから値を取得し、計算し、メモリに返すという3段階の操作をしています。そのため、アトミック操作ではありません。
ではアトミック操作でないとどうなるのか。以下のような現象に陥ってしまいます。
-
spenderがmoney($50)を読み込む -
spenderがmoney - 1($49)をする -
workerがmoney($50)を読み込む -
spenderがmoneyにmoney-1($49)を書き込む -
workerがmoney + 1($51)をする -
workerがmoneyにmoney+1($51)を書き込む
はい、spenderとworkerがそれぞれ1回だけ並行に実行しただけですが、最終的な結果は 51になりました。これがいわゆる 競合 です。
競合を避けよう!
ではこの競合を避けるためにはどうすれば良いのでしょうか?そういえばDBには競合を回避するために何か機能があったような🤔
あぁ!そうだそうだ、占有ロック だ!!(わざとらしい)
そう、競合をなくす簡単な方法は、そもそも値にロックをかけてやり、他のゴルーチンが値に操作をできなくさせれば良いのです。
ここで出てくるのが、 Mutex です。Goのsyncパッケージにあるこの機能は、操作をある期間からある期間までロックをかけ、事実上の排他制御を行います。要は鍵のような役割をしているということですね。
早速Mutexを使って money を管理してみましょう!
package main
import (
"fmt"
"time"
"sync"
)
var mu sync.Mutex
func spender(money *int) {
mu.Lock()
*money--
mu.Unlock()
}
func worker(money *int) {
mu.Lock()
*money++
mu.Unlock()
}
func main() {
money := 50
count := 100000
for range count {
go worker(&money)
go spender(&money)
}
time.Sleep(2 * time.Second)
fmt.Printf("money: %d\n", money)
}
Mutexには主に2つのメソッドが提供されています。
| メソッド名 | 用途 |
|---|---|
mu.Lock() |
変数 mu にロックがかかっていない場合、 mu にロックをかけて以降の処理を実行します。ロックがかかっている場合、ロック解除まで待機します。 |
mu.Unlock() |
変数 muのロックを解除します。 |
今回のコード例では、 *money-- と *money++ の前後にそれぞれ mu.Lock()と mu.Unlock()を追加しています。このように、 極力必要最低限の箇所にしかロックをかけない ようにすると良いです。
ついでに実行時間に関してはどうなるのでしょうか?
初めにMutexを利用していない時の時間を測定します。
money: 177
time: 44ms
次にMutexを利用すると...
money: 50
time: 59ms
はい、若干ですが処理時間が遅くなりました。
これは、 mu.Lock()しようとしたら別のゴルーチンが muをロックしてた時に発生するブロックによって待機していたことが原因です。
ブロッキングを極力減らそう!
さて、今まで moneyを労働者と消費者がひたすら同じ金額分だけ消費し合うプログラムを書いてましたが、ここで、なぜか moneyをただ監視するだけの watcherという役割を追加しましょう。この watcherは、10000回に1回、変数 moneyの値を取得するだけのものです。ここで、 watcherがMutexをLock、Unlockしているのをわかりやすくするために、 Lockと Unlockの間に100msの time.Sleepを入れてみます。
そしてこの一連の処理を run関数として括って、main関数で100回分繰り返してもらいます(main関数には後述する waitGroupを使用しています)。
package main
import (
"fmt"
"sync"
"time"
)
var mu sync.Mutex
func spender(money *int) {
mu.Lock()
*money--
mu.Unlock()
}
func worker(money *int) {
mu.Lock()
*money++
mu.Unlock()
}
func watcher(money *int) {
mu.Lock()
time.Sleep(100 * time.Millisecond)
mu.Unlock()
}
func run() int64 {
money := 50
count := 100000
s := time.Now()
watcherCount := 10000
for i := range count {
if i%watcherCount == 0 {
go watcher(&money)
}
go worker(&money)
go spender(&money)
}
e := time.Since(s)
time.Sleep(1 * time.Second)
return e.Milliseconds()
}
func main() {
count := 100
times := make([]int64, count)
var wg sync.WaitGroup
for i := range count {
wg.Add(1)
go func(i int) {
defer wg.Done()
times[i] = run()
}(i)
}
wg.Wait()
average := int64(0)
for _, time := range times {
average += time
}
average /= int64(len(times))
fmt.Printf("average: %dms\n", average)
}
さてその結果は....?
average: 5052ms
さて、平均は約5000ms、つまり5秒間になります。
では watcherが2つになったらどうなるでしょうか?
average: 10508ms
おっと、倍になりました....!ただ、ぶっちゃけ何度か実行していると watcherが1個の時より早く処理が終了したりするので、正直PCの調子によってマチマチではあると思いますが、ここまで極端に遅くなってしまいます。
これは、 watcherは値を取得するだけなのに100msもの間 msをLockしており、さらにそのゴルーチンが2つに増えたからに起因します。
このように、値を読み取るだけに Lockしたり Unlockしたりすると処理が遅くなります。この問題に対しては、Mutexの亜種である RWMutex が使えます。
RWMutexは通常のMutexに Reader と Writer の概念が加わったものになります。
Writerは従来通りの Lock() 、 Unlock() メソッドで排他制御できますが、 Readerは RLock()、 RUnlock()という専用のメソッドを使用します。この専用のメソッドは、 rwmu が Lock()されていない場合に RLock()をすると読み取り専用のロックをかけることができます。このロックは、別ゴルーチンが Rlock()をしてもブロックされることがありません。また、 RLock() による読み取りのロック状態が続く限り、 Lock()されることもありません。しかし一度 Lock()されると、他のゴルーチンからの Lock()、 RLock()はできなくなります。
これによって何が嬉しいのかというと、読み取りだけなら別のゴルーチンをブロッキングすることなくできるという点です。
では使ってみましょう!
package main
import (
"fmt"
"sync"
"time"
)
var rwmu sync.RWMutex
func spender(money *int) {
rwmu.Lock()
*money--
rwmu.Unlock()
}
func worker(money *int) {
rwmu.Lock()
*money++
rwmu.Unlock()
}
func watcher(money *int) {
rwmu.RLock()
time.Sleep(1 * time.Millisecond)
rwmu.RUnlock()
}
func run() int64 {
money := 50
count := 100000
s := time.Now()
watcherCount := 10000
for i := range count {
if i%watcherCount == 0 {
go watcher(&money)
}
go worker(&money)
go spender(&money)
}
e := time.Since(s)
time.Sleep(1 * time.Second)
return e.Milliseconds()
}
func main() {
count := 100
times := make([]int64, count)
var wg sync.WaitGroup
for i := range count {
wg.Add(1)
go func(i int) {
defer wg.Done()
times[i] = run()
}(i)
}
wg.Wait()
average := int64(0)
for _, time := range times {
average += time
}
average /= int64(len(times))
fmt.Printf("average: %dms\n", average)
}
これによって時間はどれぐらい変わったのでしょうか??
まずは watcherが1つだけの場合。
average: 4034ms
実は出力上は1s短縮されていますが、先ほども紹介した通り、 RLockしている場合、 Lockもブロッキングされるため、実際のところあまり速度は変わりません。
では watcherが2つだけの場合だとどうなるでしょうか?
average: 4679ms
おっ!先ほどのように顕著に増加していませんね!このように時間が短縮された理由は、 watcherが RLockした状態で別の watcherが RLockしても、ブロッキングが発生しないからです。
完了まで待機しよう!
さて、ここまでの処理では、 time.Sleep関数を用いて処理が終了するのを待機していました。
しかし、完了するまでの時間はCPU使用率など、状況によって大きく変わります。APIであれば、そもそもレスポンスまでの時間は時々によって異なります。
そんな時に使われるのが、sync.WaitGroupです。実際にコードを見てみましょう。
package main
import (
"fmt"
"sync"
"time"
)
var mu sync.Mutex
func spender(money *int, wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock()
*money--
mu.Unlock()
}
func worker(money *int, wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock()
*money++
mu.Unlock()
}
func watcher(money *int, wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock()
fmt.Printf("I watched money: %d\n", *money)
mu.Unlock()
}
func main() {
money := 50
count := 100000
var wg sync.WaitGroup
s := time.Now()
watcherCount := 10000
for i := range count {
if i%watcherCount == 0 {
wg.Add(1)
go watcher(&money, &wg)
}
wg.Add(2) // 一応 Add(delta)には2とかも入れられます
go worker(&money, &wg)
go spender(&money, &wg)
}
e := time.Since(s)
wg.Wait()
fmt.Printf("money: %d\n", money)
}
そこまで大きな変更はしてませんが、 main関数内に wg変数を定義しています。これは内部的にカウンター変数的なのを保有しており、 wg.Add(delta int)で、delta分だけカウンターを増やします。 wg.Done()は wg.Add(-1)をしているので、カウンター変数を1減らしています。そして wg.Wait()は、 wgが保有しているカウンター変数が0になるまでブロックします。
これによって、全てのゴルーチンが動作し終わってから標準出力で moneyが出力されるようになりました!
終わりに
今回はゴルーチンで変数を共有してみました!私は普段のプログラミングで変数を共有するシーンに会ったことがないのですが、かといってマイナーなシチュエーションでもないと思うので、頭の片隅程度に「あぁこういう処理あったなぁ」ぐらいで思っていただければ幸いです!
参考文献
Goでの並行処理を徹底解剖! / 並行処理と並列処理 | zenn
Go言語で学ぶ並行プログラミング 他言語にも適用できる原則とベストプラクティス