はじめに
この記事ではgoroutineおよび関連性の高いsyncパッケージの基本知識や使い方をまとめます。
Goの基本的な文法はざっと触れたが、「goroutine」、「WaitGroup」、「channel」、「Mutex」などの言葉を聞いて全く心配ないとは言い切れない人向けの記事です。
残念ながら、メモリやCPU、プロセスなどのOSに近いところまでは踏み入れません。
goroutineは簡単だとよく聞きますが、そもそも並行処理そのものが難しいので1つずつ理解してきましょう!
本記事での動作確認環境は以下です。
$ go version
go version go1.11.4 darwin/amd64
Goでの並行処理
ネットワーク通信などで待ち時間の大きい処理を非同期に行いたい、直列で動作させる必要がない処理群を高速に終わらせたいなどを理由に並行処理を使うことが多いと思います。
Goではgoroutineと呼ばれる軽量スレッドを簡単に動かせます。
なぜ「軽量」か
- メモリ消費量が少ない
- goroutineの生成と破棄コストが低い
- コンテキストスイッチのコストが低い
詳細はgoroutineはなぜ軽量なのかをご参照ください。
並行処理と並列処理の違い
- 並行処理(Concurrency)
- 複数のタスクが交互に実行される処理
- 一つのプロセッサが複数のタスクを管理し、それらを一つずつ少しずつ実行することで、全てのタスクが同時に進行しているように見せる。
- タスクが独立していて互いに影響を与えない場合に特に有用。
- goroutineは並行処理(Concurrency)
- 並列処理(Parallelism)
- 複数のタスクが同時に実行される処理
- 複数のプロセッサが本当に同時に異なるタスクを実行する
- 大量のデータを処理する必要がある場合や、タスクが互いに依存している場合に特に有用。
goroutineを動かしてみる
Goではgo文に関数を指定することで、簡単に並行処理を実装できます。
言語名まんまの構文が用意されているあたり、よほど並行処理に力を入れている雰囲気を感じます。
下記では、main関数内でhoge関数を並行処理させています。
time.Sleep
しているのは、hoge関数が並行処理開始される前に、main関数が終了してしまうためです。
func main() {
fmt.Println("main")
go hoge() //goroutine
time.Sleep(time.Second)
}
func hoge() {
fmt.Println("hoge")
}
main
hoge
また、go文には無名関数を指定することも可能です。
関数呼び出しなので、末尾に()
を付け忘れないようにしましょう。
func main() {
fmt.Println("main")
go func() {
fmt.Println("hoge")
}() // ()を忘れずに
time.Sleep(time.Second)
}
main
hoge
channel
channelは複数goroutine間で「簡単に安全にデータのやりとりを行うための特別なデータ構造」です。
データ(channel)の送受信をスレッド間で行うため、明示的な排他制御をプログラマが実装する必要がありません。
channelの仕組みが無い他言語では、複数スレッド間でデータのやりとりを行うために排他制御を明示的に行う必要がありました。(後述しますが、GoにもMutexでその仕組みを実現することもできます。)
Goの並行処理に関するスローガンDo not communicate by sharing memory; instead, share memory by communicating.
をchannelが実現する格好になっています。
基本文法
宣言
下記はint型のchannelを作る例です。
var ch chan int
makeコマンドで生成することも可能です。
ch := make(chan int)
読み書き
channelへの書き込みは下記のようにします。
ch <- 1
channelからの読み込みは変数を用意して、そこに代入します。
tmp := <-ch
最初は記法に慣れないかもしれませんが、<-
を矢印と思えばイメージが湧きやすいと思います。
channelとgoroutineの組み合わせ
channelとgoroutineを組み合わせた簡単な例を示します。
他ルーチンで宣言したchannelは共有できます。
func main() {
ch := make(chan int)
go func() {
ch <- 1
}()
a := <-ch
fmt.Println(a)
}
1
バッファ付きchannel
バッファ付きchannelとは、指定した数の書き込み用バッファを持つchannelです。
キューのような性質を持ち、FIFO(先入れ先出し)となります。
バッファの上限まで書き込んだら、読み込まれるまでは書き込みがブロックされ、読み込まれて空きができたら、再びその空きの分だけ書き込めます。
下記の例では、サブルーチンでch
に1〜5までしか書き込めないので、それ以降の書き込み処理は読み込まれるまでいったんブロックになります。
次に、メインルーチンの1つ目のforループで1〜5がchから読み込まれて空きができたので、残りの6〜10がサブルーチンで再び書き込まれます。
次に、メインルーチンの2つ目のforループで6〜10がchから読み込まれます。
func main() {
ch := make(chan int, 5) // バッファ付きchannel宣言
go func() {
for i := 1; i <= 10; i++ {
ch <- i
}
}()
// 念のためchにデータが書き込まれるのを待つ
time.Sleep(time.Second)
// 1〜5を読み込んで出力
for i := 1; i <= 5; i++ {
tmp := <-ch
fmt.Println(tmp)
}
// 6〜10がchに書き込まれるのを待つ
fmt.Println("waiting")
time.Sleep(time.Second)
// 6〜10を読み込んで出力
for i := 1; i <= 5; i++ {
tmp := <-ch
fmt.Println(tmp)
}
}
1
2
3
4
5
waiting
6
7
8
9
10
バッファに空きがないchannelに書き込もうとするとランタイムパニックになります。
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
ch <- 3
}
fatal error: all goroutines are asleep - deadlock!
channelに格納されているデータの個数を調べる
len関数でデータの個数を調べられます。(バッファサイズではありません。)
func main() {
ch := make(chan int, 5)
ch <- 1
ch <- 2
fmt.Println(len(ch))
}
2
channelを閉じる
close関数でchannelを閉じます。
channelを閉じると、すべてのgoroutineへ通知が飛びます。
channelは一度しか閉じることができません。
閉じたchannelへの書き込み
閉じたchannelへデータを書き込もうとするとランタイムパニックになります。
func main() {
ch := make(chan int, 5)
ch <- 1
close(ch)
ch <- 2
}
panic: send on closed channel
閉じたchannelからの読み込み
バッファが無いchannelの場合、closeしたchannelからデータを読み込もうとすると、ランタイムパニックを起こします。
また、<-ch
の2番目の戻り値でchannelが空いているかどうか(空いていればtrue、閉じているばfalse)を知ることができます。
func main() {
ch := make(chan int)
go func() {
ch <- 1
ch <- 2
ch <- 3
}()
time.Sleep(time.Second)
close(ch)
for {
a, ok := <-ch
if !ok {
fmt.Println("error")
break
}
fmt.Println(a)
}
}
error
panic: send on closed channel
バッファ付きchannelの場合、閉じてもchannelに値が残っている場合は、全てを読みだしてからcloseが実施されます。
最後にerrorが出力されているのは、closeされているからです。
つまり、ok
がtrueかfalseかの厳密の定義は「channelのバッファが空きでかつcloseであるかどうか」となります。
func main() {
ch := make(chan int, 3)
go func() {
ch <- 1
ch <- 2
ch <- 3
}()
time.Sleep(time.Second)
close(ch)
for {
a, ok := <-ch
if !ok {
fmt.Println("error")
break
}
fmt.Println(a)
}
}
1
2
3
error
select構文
select構文を使えば、読み書き可能なchannelがある場合のみ処理を実行することができます。
case節の条件を満たせばそのcase節の処理になります。いずれのcase節にも処理が入らなかった場合は、default節の処理になります。
下記の例では、1週目のforループではch
に値が入っているためそれを読み込んで処理をしていますが、2周目のforループではch
に値が入っていないため、defaultの処理に入っています。
func main() {
ch := make(chan string)
go func() {
ch <- "cat"
}()
time.Sleep(time.Second)
for i := 0; i < 2; i++ {
select {
case a := <-ch:
fmt.Println(a)
default:
fmt.Println("nothing in ch")
}
}
}
cat
nothing in ch
複数のcase節が実行可能な場合、どのcase節が実行されるかはランダムです。(上が優先ではないので要注意)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
ch1 <- "cat"
}()
go func() {
ch2 <- "dog"
}()
time.Sleep(time.Second)
select {
case a1 := <-ch1:
fmt.Println(a1)
case a2 := <-ch2:
fmt.Println(a2)
default:
fmt.Println("nothing in ch")
}
}
goroutineのキャンセル
キャンセルが必要な理由
- 後続の処理を続けるため
- 例えば外部APIを実行する際にレスポンスが異常に遅い場合はネットワーク障害などの可能性もあるためタイムアウトする必要があります。
- リソース解放のため
- goroutineをキャンセルせずに放置するとそのままリソースを消費し続ける可能性があります。
Context
goroutineのキャンセルにはcontext.Context
を使用します。
context.Context
は他に、リクエストスコープの変数を扱う用途でも使用されますが、Goの並行処理から外れた話になりますので、そちらは割愛させていただきます。
下記はWithCancel関数を使い外部から任意のタイミングでgoroutineを停止させる例です。
func main() {
// 空のcontextを生成
ctx := context.Background()
// 子のcontextを作成。第二返り値を使って子のコンテキストをキャンセルできる。
ctxChild, cancel := context.WithCancel(ctx)
// キャンセルされるまで無限ループするgoroutineを生成
go func() {
for {
select {
// キャンセルされると入る処理
case <-ctxChild.Done():
fmt.Println("context done")
return
// キャンセルされない間の処理
default:
fmt.Println("hello")
}
}
}()
// goroutineの処理が始まる前にキャンセルされるのを防ぐためスリープ
time.Sleep(1 * time.Second)
// キャンセル実行
cancel()
time.Sleep(1 * time.Second)
fmt.Println("main end")
}
hello
hello
(省略)
hello
context done
main end
外部からcancel()を呼び出してキャンセルしたからといって、goroutine自体の処理が止まるわけではありません。returnしてあげる必要があります。
他に、WithDeadline関数で指定時刻にgorouitenをキャンセルする方法やWithTimeout関数で指定時間後にgoroutineをキャンセルする方法もあります。
syncパッケージ
sync.Mutexとsync.RWMutex
Goではchannelのおかげで明示的な排他制御を記述する必要がありません。
しかし、Mutexで明示的な排他制御を記述することもできます。
これは他言語でも存在する(Javaのsynchronizedなど)伝統的なロックの仕方です。
排他制御をしなかった場合
当然ですが、複数goroutineで何の排他制御もせずにchannelでない変数を共有して更新処理をしてしまうと、更新処理前のデータを読み取ってしまい、不整合が発生してしまいます。
下記の例では、1000個のgoroutineを作成し、channelでないデータ(int型変数)を排他ロックを行わずにカウントアップさせます。
実行結果は、1000を満たしません。これは、あるgoroutineでのカウントアップ前の値を別のgoroutineが読み取ってカウントアップしてしまうからです。
func main() {
c := 0
for i := 0; i < 1000; i++ {
go func() {
c++
}()
}
time.Sleep(time.Second)
fmt.Println(c)
}
959
排他ロック
sync.Mutex
のLock関数とUnlock関数で排他ロックの取得と解除ができます。
排他ロックなので、排他ロックを得たgoroutineが存在する場合は、ロックを得ようとする他のgoroutineは処理を待ちます。
下記のように、排他ロックをかけると期待通り最終結果が1000になります。
deferでUnlockするのはお決まりのパターンです。
func main() {
var mu sync.Mutex
c := 0
for i := 0; i < 1000; i++ {
go func() {
mu.Lock() // 排他ロック取得
defer mu.Unlock() // 関数終了時に排他ロック解除
c++
}()
}
time.Sleep(time.Second)
fmt.Println(c)
}
1000
共有ロック
sync.RWMutex
のRlock関数とRunlock関数で共有ロックの取得と解除ができます。(sync.RWMutex
はLock関数とUnLock関数も持っています。)
共有ロック同士であれば処理を進めることができます。共有ロックを得たgoroutineが存在する場合は、排他ロックを取得しようとするgoroutineは待ちます。
基本的な使い方は排他ロックと同じです。
Mutexとchannelの使い分け
Mutexとchannelはどのように使い分けるべきなのでしょうか。
各ドキュメントを読んでみました。
GitHubのwikiによると
GoのGitHubのwikiによると、「どちらも似たようなことはできるよ。よりシンプルに書ける方法をケースバイケースで選択してね。Go初心者はchannelばかり使いがちだけどsync.Mutex
も恐れず使っていこうぜ。」(意訳)とのことです。
一応、下の使い分けがプラクティスとして紹介されていますが、あまり今の自分にはピンと来ませんでした。
channelを使うケース
- データの所有権を受け渡ししたい場合
- 処理を分散したい場合
- 非同期で結果を受け渡ししたい場合
Mutexを使うケース
- キャッシュを扱う場合
- 状態を扱う場合
A Tour of Goによると
A Tour of Goによると、「情報のやりとりが必要ない時、あるいはコンフリクトを避けるために1つのgoroutineで1つの変数のみにアクセスするときはsync.Mutex
使おう」(意訳)とのことです。こちらもピンと来ませんでした。
The Go blogによると
The Go blogによると、mapはスレッドセーフでないため、mapを扱う処理で排他制御をかけたいときにsync.RWMutex
を使うのが一般的とのことです。これはピンと来ました。
例えば、下記のように複数goroutineで共通のmapを更新しようとするとエラーになります。
func main() {
tmpMap := make(map[string]int)
for i := 0; i < 1000; i++ {
go func() {
tmpMap["something"] = i
}()
}
fmt.Println(tmpMap)
}
fatal error: concurrent map writes
下記のようにロックをかければ処理を継続できます。
// mapとmutexをstructのプロパティに持たせる
type SafeCounter struct {
v map[string]int
mux sync.RWMutex
}
// Inc 指定したkeyのvalueをインクリメントする
func (c *SafeCounter) Inc(key string) {
// 排他ロックをかけて値更新
c.mux.Lock()
defer c.mux.Unlock()
c.v[key]++
}
// GetValue ゲッター
func (c *SafeCounter) GetValue(key string) int {
// 共有ロックをかけて値取得
c.mux.RLock()
defer c.mux.RUnlock()
return c.v[key]
}
func main() {
c := SafeCounter{v: make(map[string]int)}
for i := 0; i < 1000; i++ {
go c.Inc("somekey")
}
time.Sleep(time.Second)
fmt.Println(c.GetValue("somekey"))
}
1000
しかしながら、Go1.9以降ではsync.Map
が標準パッケージに含まれるようになったため、自身でRWMutexとmapの併用を記述する必要がなくなりました。
func main() {
// mapの宣言。keyとvalueはinterface型。
sMap := sync.Map{}
for i := 0; i < 1000; i++ {
go func() {
// 更新
sMap.Store("something", i)
}()
}
time.Sleep(time.Second)
// 取得
if val, ok := sMap.Load("something"); ok {
fmt.Println(val)
}
}
1000
ただし、sync.Map
は以下の2ケースに該当する場合のみに利用し、基本的には普通のmapを使って自分でロックかける方法が良いようです。
- 同じキーに一度しか書き込まれず、何度も読み取られる場合 (cache)
- 複数のgoroutineがそれぞれ別のkey群を読み書きする場合
参考:https://github.com/suzuki-shunsuke/issue/issues/53#issuecomment-640182869
その他
上記に加えて、他言語での実装をできるだけそのままリプレイスしたいときに、伝統的な排他処理であるMutexを使うのかなと思いました。
sync.WaitGroup
sync.WaitGroup
を利用して、動作中の全てのgoroutineの処理が完了してから次の処理を実行するようにできます。
time.Sleep
では必ずしもgoroutineの全ての処理が完了するとは限らないですし、無駄に時間を待つ場合もあります。
基本的にはsync.WaitGroup
を使いましょう。
例えば、上記の例でSleepで待っていた例は下のように書き換えられます。
func main() {
var wg sync.WaitGroup
fmt.Println("main")
wg.Add(1) // 待っておいて欲しいジョブ数を与える
go func() {
defer wg.Done() // ジョブが完了したら完了通知する。残りジョブ数がデクリメントされる。
fmt.Println("hoge")
}()
wg.Wait() // 全てのジョブが完了するまで待つ
}
main
hoge
複数goroutineでも、もちろん全て待ちます。
func main() {
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1) // 待っておいて欲しいジョブ数を与える
go func() {
defer wg.Done() // ジョブが完了したら完了通知する。残りジョブ数がデクリメントされる。
fmt.Println("hello")
}()
}
wg.Wait() // 全てのジョブが完了するまで待つ
fmt.Println("done")
}
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
done
sync.Once
sync.Once
で一度だけ関数を実行するようにできます。
一般的に初期化処理用に使用されます。
init関数と異なり、実行タイミングを任意に指定できます。
var once sync.Once
func something() {
fmt.Println("Hello")
}
func main() {
// something関数を2回呼び出そうとするが、実際には1回しか呼び出されない。
once.Do(something)
once.Do(something)
}
Hello
sync.Cond
sync.Cond
は他言語であるような状態が変わったことを通知するコンディション変数として使えます。
Broadcast関数でファンアウトを実現する方法が一般的です。
channelでもクローズすれば全てのgoroutineに完了通知することが可能ですが、クローズは一度だけしかできないですし、channelへの書き込み処理はできなくなります。
func main() {
var wg sync.WaitGroup
var mu sync.Mutex
cond := sync.NewCond(&mu)
for _, sport := range []string{"Basketball", "Baseball", "Football"} {
wg.Add(1)
go func(sport string) {
defer wg.Done()
mu.Lock()
defer mu.Unlock()
cond.Wait() // 完了通知されるまで待つ
fmt.Println(sport)
}(sport)
}
// 事前にやっておきたい処理
fmt.Println("My favorite sports: ")
// 事前処理完了通知。待っていた他の処理を開始させる。
cond.Broadcast()
wg.Wait()
}
My favorite sports:
Football
Basketball
Baseball
Goでの並行処理の実装パターン
並行処理の実装パターン化をまとめている記事がありましたので、掲載させていただきます。
- Goにおける並行・並列処理のパターン集
-
WEB+DB PRESS Vol.95
- バッファ付きchannelを使って同時に実行されるgoroutineの数を制限しよう(セマフォ)
- channelを利用して指定した数のワーカにデータの排他制御無しでファンアウトする
- channelを利用して排他制御無しで連番を扱う
- etc
まとめ
- go文で簡単にgoroutineを実装できる
- time.SleepでなくWaitGroupを使う
- channelかMutexかはケースバイケースだが初心者はとりあえずchannelに走る
- select構文でchannelの場合分け
- 並行処理はやはり複雑。用法用量を守らないと変に複雑なコードになりがち。