LoginSignup
67
54

【Goのやさしい記事】goroutineと排他制御を10分で入門しよう!

Last updated at Posted at 2019-05-06

はじめに

この記事ではgoroutineおよび関連性の高いsyncパッケージの基本知識や使い方をまとめます。
Goの基本的な文法はざっと触れたが、「goroutine」、「WaitGroup」、「channel」、「Mutex」などの言葉を聞いて全く心配ないとは言い切れない人向けの記事です。
残念ながら、メモリやCPU、プロセスなどのOSに近いところまでは踏み入れません。

goroutineは簡単だとよく聞きますが、そもそも並行処理そのものが難しいので1つずつ理解してきましょう!

本記事での動作確認環境は以下です。

$ go version
go version go1.11.4 darwin/amd64

Goでの並行処理

ネットワーク通信などで待ち時間の大きい処理を非同期に行いたい、直列で動作させる必要がない処理群を高速に終わらせたいなどを理由に並行処理を使うことが多いと思います。
Goではgoroutineと呼ばれる軽量スレッドを簡単に動かせます。

なぜ「軽量」か

  • メモリ消費量が少ない
  • goroutineの生成と破棄コストが低い
  • コンテキストスイッチのコストが低い

詳細はgoroutineはなぜ軽量なのかをご参照ください。

並行処理と並列処理の違い

  • 並行処理(Concurrency)
    • 複数のタスクが交互に実行される処理
    • 一つのプロセッサが複数のタスクを管理し、それらを一つずつ少しずつ実行することで、全てのタスクが同時に進行しているように見せる。
    • タスクが独立していて互いに影響を与えない場合に特に有用。
    • goroutineは並行処理(Concurrency)
  • 並列処理(Parallelism)
    • 複数のタスクが同時に実行される処理
    • 複数のプロセッサが本当に同時に異なるタスクを実行する
    • 大量のデータを処理する必要がある場合や、タスクが互いに依存している場合に特に有用。

goroutineを動かしてみる

Goではgo文に関数を指定することで、簡単に並行処理を実装できます。
言語名まんまの構文が用意されているあたり、よほど並行処理に力を入れている雰囲気を感じます。

下記では、main関数内でhoge関数を並行処理させています。
time.Sleepしているのは、hoge関数が並行処理開始される前に、main関数が終了してしまうためです。

func main() {
	fmt.Println("main")
	go hoge() //goroutine
	time.Sleep(time.Second)
}

func hoge() {
	fmt.Println("hoge")
}
実行結果
main
hoge

また、go文には無名関数を指定することも可能です。
関数呼び出しなので、末尾に()を付け忘れないようにしましょう。

func main() {
	fmt.Println("main")
	go func() {
		fmt.Println("hoge")
	}() // ()を忘れずに
	time.Sleep(time.Second)
}
実行結果
main
hoge

channel

channelは複数goroutine間で「簡単に安全にデータのやりとりを行うための特別なデータ構造」です。
データ(channel)の送受信をスレッド間で行うため、明示的な排他制御をプログラマが実装する必要がありません。
channelの仕組みが無い他言語では、複数スレッド間でデータのやりとりを行うために排他制御を明示的に行う必要がありました。(後述しますが、GoにもMutexでその仕組みを実現することもできます。)
Goの並行処理に関するスローガンDo not communicate by sharing memory; instead, share memory by communicating.をchannelが実現する格好になっています。

基本文法

宣言

下記はint型のchannelを作る例です。

var ch chan int

makeコマンドで生成することも可能です。

ch := make(chan int)

読み書き

channelへの書き込みは下記のようにします。

ch <- 1

channelからの読み込みは変数を用意して、そこに代入します。

tmp := <-ch

最初は記法に慣れないかもしれませんが、<-を矢印と思えばイメージが湧きやすいと思います。

channelとgoroutineの組み合わせ

channelとgoroutineを組み合わせた簡単な例を示します。
他ルーチンで宣言したchannelは共有できます。

func main() {
	ch := make(chan int)

	go func() {
		ch <- 1
	}()

	a := <-ch
	fmt.Println(a)
}
実行結果
1

バッファ付きchannel

バッファ付きchannelとは、指定した数の書き込み用バッファを持つchannelです。
キューのような性質を持ち、FIFO(先入れ先出し)となります。
バッファの上限まで書き込んだら、読み込まれるまでは書き込みがブロックされ、読み込まれて空きができたら、再びその空きの分だけ書き込めます。

下記の例では、サブルーチンでchに1〜5までしか書き込めないので、それ以降の書き込み処理は読み込まれるまでいったんブロックになります。
次に、メインルーチンの1つ目のforループで1〜5がchから読み込まれて空きができたので、残りの6〜10がサブルーチンで再び書き込まれます。
次に、メインルーチンの2つ目のforループで6〜10がchから読み込まれます。

func main() {
	ch := make(chan int, 5) // バッファ付きchannel宣言

	go func() {
		for i := 1; i <= 10; i++ {
			ch <- i
		}
	}()

	// 念のためchにデータが書き込まれるのを待つ
	time.Sleep(time.Second)

	// 1〜5を読み込んで出力
	for i := 1; i <= 5; i++ {
		tmp := <-ch
		fmt.Println(tmp)
	}

	// 6〜10がchに書き込まれるのを待つ
	fmt.Println("waiting")
	time.Sleep(time.Second)

	// 6〜10を読み込んで出力
	for i := 1; i <= 5; i++ {
		tmp := <-ch
		fmt.Println(tmp)
	}
}
実行結果
1
2
3
4
5
waiting
6
7
8
9
10

バッファに空きがないchannelに書き込もうとするとランタイムパニックになります。

func main() {
	ch := make(chan int, 2)
	ch <- 1
	ch <- 2
	ch <- 3
}
実行結果
fatal error: all goroutines are asleep - deadlock!

channelに格納されているデータの個数を調べる

len関数でデータの個数を調べられます。(バッファサイズではありません。)

func main() {
	ch := make(chan int, 5)
	ch <- 1
	ch <- 2
	fmt.Println(len(ch))
}
実行結果
2

channelを閉じる

close関数でchannelを閉じます。
channelを閉じると、すべてのgoroutineへ通知が飛びます。
channelは一度しか閉じることができません。

閉じたchannelへの書き込み

閉じたchannelへデータを書き込もうとするとランタイムパニックになります。

func main() {
	ch := make(chan int, 5)
	ch <- 1
	close(ch)
	ch <- 2
}
実行結果
panic: send on closed channel

閉じたchannelからの読み込み

バッファが無いchannelの場合、closeしたchannelからデータを読み込もうとすると、ランタイムパニックを起こします。
また、<-chの2番目の戻り値でchannelが空いているかどうか(空いていればtrue、閉じているばfalse)を知ることができます。

func main() {
	ch := make(chan int)

	go func() {
		ch <- 1
		ch <- 2
		ch <- 3
	}()

	time.Sleep(time.Second)
	close(ch)
	for {
		a, ok := <-ch
		if !ok {
			fmt.Println("error")
			break
		}
		fmt.Println(a)
	}
}
実行結果
error
panic: send on closed channel

バッファ付きchannelの場合、閉じてもchannelに値が残っている場合は、全てを読みだしてからcloseが実施されます。
最後にerrorが出力されているのは、closeされているからです。
つまり、okがtrueかfalseかの厳密の定義は「channelのバッファが空きでかつcloseであるかどうか」となります。

func main() {
	ch := make(chan int, 3)

	go func() {
		ch <- 1
		ch <- 2
		ch <- 3
	}()

	time.Sleep(time.Second)
	close(ch)
	for {
		a, ok := <-ch
		if !ok {
			fmt.Println("error")
			break
		}
		fmt.Println(a)
	}
}
実行結果
1
2
3
error

select構文

select構文を使えば、読み書き可能なchannelがある場合のみ処理を実行することができます。
case節の条件を満たせばそのcase節の処理になります。いずれのcase節にも処理が入らなかった場合は、default節の処理になります。

下記の例では、1週目のforループではchに値が入っているためそれを読み込んで処理をしていますが、2周目のforループではchに値が入っていないため、defaultの処理に入っています。

func main() {
	ch := make(chan string)

	go func() {
		ch <- "cat"
	}()

	time.Sleep(time.Second)

	for i := 0; i < 2; i++ {
		select {
		case a := <-ch:
			fmt.Println(a)
		default:
			fmt.Println("nothing in ch")
		}
	}
}
実行結果
cat
nothing in ch

複数のcase節が実行可能な場合、どのcase節が実行されるかはランダムです。(上が優先ではないので要注意)

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go func() {
		ch1 <- "cat"
	}()

	go func() {
		ch2 <- "dog"
	}()

	time.Sleep(time.Second)

	select {
	case a1 := <-ch1:
		fmt.Println(a1)
	case a2 := <-ch2:
		fmt.Println(a2)
	default:
		fmt.Println("nothing in ch")
	}
}

goroutineのキャンセル

キャンセルが必要な理由

  • 後続の処理を続けるため
    • 例えば外部APIを実行する際にレスポンスが異常に遅い場合はネットワーク障害などの可能性もあるためタイムアウトする必要があります。
  • リソース解放のため
    • goroutineをキャンセルせずに放置するとそのままリソースを消費し続ける可能性があります。

Context

goroutineのキャンセルにはcontext.Contextを使用します。
context.Contextは他に、リクエストスコープの変数を扱う用途でも使用されますが、Goの並行処理から外れた話になりますので、そちらは割愛させていただきます。

下記はWithCancel関数を使い外部から任意のタイミングでgoroutineを停止させる例です。

func main() {
	// 空のcontextを生成
	ctx := context.Background()

	// 子のcontextを作成。第二返り値を使って子のコンテキストをキャンセルできる。
	ctxChild, cancel := context.WithCancel(ctx)

	// キャンセルされるまで無限ループするgoroutineを生成
	go func() {
		for {
			select {
			// キャンセルされると入る処理
			case <-ctxChild.Done():
				fmt.Println("context done")
				return
			// キャンセルされない間の処理
			default:
				fmt.Println("hello")
			}
		}
	}()

	// goroutineの処理が始まる前にキャンセルされるのを防ぐためスリープ
	time.Sleep(1 * time.Second)

	// キャンセル実行
	cancel()

	time.Sleep(1 * time.Second)
	fmt.Println("main end")
}
実行結果
hello
hello
(省略)
hello
context done
main end

外部からcancel()を呼び出してキャンセルしたからといって、goroutine自体の処理が止まるわけではありません。returnしてあげる必要があります。

他に、WithDeadline関数で指定時刻にgorouitenをキャンセルする方法やWithTimeout関数で指定時間後にgoroutineをキャンセルする方法もあります。

syncパッケージ

sync.Mutexとsync.RWMutex

Goではchannelのおかげで明示的な排他制御を記述する必要がありません。
しかし、Mutexで明示的な排他制御を記述することもできます。
これは他言語でも存在する(Javaのsynchronizedなど)伝統的なロックの仕方です。

排他制御をしなかった場合

当然ですが、複数goroutineで何の排他制御もせずにchannelでない変数を共有して更新処理をしてしまうと、更新処理前のデータを読み取ってしまい、不整合が発生してしまいます。
下記の例では、1000個のgoroutineを作成し、channelでないデータ(int型変数)を排他ロックを行わずにカウントアップさせます。
実行結果は、1000を満たしません。これは、あるgoroutineでのカウントアップ前の値を別のgoroutineが読み取ってカウントアップしてしまうからです。

func main() {
	c := 0
	for i := 0; i < 1000; i++ {
		go func() {
			c++
		}()
	}
	time.Sleep(time.Second)
	fmt.Println(c)
}
実行結果
959

排他ロック

sync.MutexのLock関数とUnlock関数で排他ロックの取得と解除ができます。
排他ロックなので、排他ロックを得たgoroutineが存在する場合は、ロックを得ようとする他のgoroutineは処理を待ちます。

下記のように、排他ロックをかけると期待通り最終結果が1000になります。
deferでUnlockするのはお決まりのパターンです。

func main() {
	var mu sync.Mutex
	c := 0
	for i := 0; i < 1000; i++ {
		go func() {
			mu.Lock()         // 排他ロック取得
			defer mu.Unlock() // 関数終了時に排他ロック解除
			c++
		}()
	}
	time.Sleep(time.Second)
	fmt.Println(c)
}
実行結果
1000

共有ロック

sync.RWMutexのRlock関数とRunlock関数で共有ロックの取得と解除ができます。(sync.RWMutexはLock関数とUnLock関数も持っています。)
共有ロック同士であれば処理を進めることができます。共有ロックを得たgoroutineが存在する場合は、排他ロックを取得しようとするgoroutineは待ちます。
基本的な使い方は排他ロックと同じです。

Mutexとchannelの使い分け

Mutexとchannelはどのように使い分けるべきなのでしょうか。
各ドキュメントを読んでみました。

GitHubのwikiによると

GoのGitHubのwikiによると、「どちらも似たようなことはできるよ。よりシンプルに書ける方法をケースバイケースで選択してね。Go初心者はchannelばかり使いがちだけどsync.Mutexも恐れず使っていこうぜ。」(意訳)とのことです。
一応、下の使い分けがプラクティスとして紹介されていますが、あまり今の自分にはピンと来ませんでした。

channelを使うケース

  • データの所有権を受け渡ししたい場合
  • 処理を分散したい場合
  • 非同期で結果を受け渡ししたい場合

Mutexを使うケース

  • キャッシュを扱う場合
  • 状態を扱う場合

A Tour of Goによると

A Tour of Goによると、「情報のやりとりが必要ない時、あるいはコンフリクトを避けるために1つのgoroutineで1つの変数のみにアクセスするときはsync.Mutex使おう」(意訳)とのことです。こちらもピンと来ませんでした。

The Go blogによると

The Go blogによると、mapはスレッドセーフでないため、mapを扱う処理で排他制御をかけたいときにsync.RWMutexを使うのが一般的とのことです。これはピンと来ました。
例えば、下記のように複数goroutineで共通のmapを更新しようとするとエラーになります。

func main() {
	tmpMap := make(map[string]int)
	for i := 0; i < 1000; i++ {
		go func() {
			tmpMap["something"] = i
		}()
	}
	fmt.Println(tmpMap)
}
実行結果
fatal error: concurrent map writes

下記のようにロックをかければ処理を継続できます。

// mapとmutexをstructのプロパティに持たせる
type SafeCounter struct {
	v   map[string]int
	mux sync.RWMutex
}

// Inc 指定したkeyのvalueをインクリメントする
func (c *SafeCounter) Inc(key string) {
	// 排他ロックをかけて値更新
	c.mux.Lock()
	defer c.mux.Unlock()
	c.v[key]++
}

// GetValue ゲッター
func (c *SafeCounter) GetValue(key string) int {
	// 共有ロックをかけて値取得
	c.mux.RLock()
	defer c.mux.RUnlock()
	return c.v[key]
}

func main() {
	c := SafeCounter{v: make(map[string]int)}
	for i := 0; i < 1000; i++ {
		go c.Inc("somekey")
	}

	time.Sleep(time.Second)
	fmt.Println(c.GetValue("somekey"))
}
実行結果
1000

しかしながら、Go1.9以降ではsync.Mapが標準パッケージに含まれるようになったため、自身でRWMutexとmapの併用を記述する必要がなくなりました。

func main() {
	// mapの宣言。keyとvalueはinterface型。
	sMap := sync.Map{}
	for i := 0; i < 1000; i++ {
		go func() {
			// 更新
			sMap.Store("something", i)
		}()
	}
	time.Sleep(time.Second)
	// 取得
	if val, ok := sMap.Load("something"); ok {
		fmt.Println(val)
	}
}
実行結果
1000

ただし、sync.Mapは以下の2ケースに該当する場合のみに利用し、基本的には普通のmapを使って自分でロックかける方法が良いようです。

  1. 同じキーに一度しか書き込まれず、何度も読み取られる場合 (cache)
  2. 複数のgoroutineがそれぞれ別のkey群を読み書きする場合

参考:https://github.com/suzuki-shunsuke/issue/issues/53#issuecomment-640182869

その他

上記に加えて、他言語での実装をできるだけそのままリプレイスしたいときに、伝統的な排他処理であるMutexを使うのかなと思いました。

sync.WaitGroup

sync.WaitGroupを利用して、動作中の全てのgoroutineの処理が完了してから次の処理を実行するようにできます。
time.Sleepでは必ずしもgoroutineの全ての処理が完了するとは限らないですし、無駄に時間を待つ場合もあります。
基本的にはsync.WaitGroupを使いましょう。

例えば、上記の例でSleepで待っていた例は下のように書き換えられます。

func main() {
	var wg sync.WaitGroup
	fmt.Println("main")
	wg.Add(1) // 待っておいて欲しいジョブ数を与える
	go func() {
		defer wg.Done() // ジョブが完了したら完了通知する。残りジョブ数がデクリメントされる。
		fmt.Println("hoge")
	}()
	wg.Wait() // 全てのジョブが完了するまで待つ
}
実行結果
main
hoge

複数goroutineでも、もちろん全て待ちます。

func main() {
	var wg sync.WaitGroup
	for i := 1; i <= 10; i++ {
		wg.Add(1) // 待っておいて欲しいジョブ数を与える
		go func() {
			defer wg.Done() // ジョブが完了したら完了通知する。残りジョブ数がデクリメントされる。
			fmt.Println("hello")
		}()
	}
	wg.Wait() // 全てのジョブが完了するまで待つ
	fmt.Println("done")
}
実行結果
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
done

sync.Once

sync.Onceで一度だけ関数を実行するようにできます。
一般的に初期化処理用に使用されます。
init関数と異なり、実行タイミングを任意に指定できます。

var once sync.Once

func something() {
	fmt.Println("Hello")
}

func main() {
	// something関数を2回呼び出そうとするが、実際には1回しか呼び出されない。
	once.Do(something)
	once.Do(something)
}
実行結果
Hello

sync.Cond

sync.Condは他言語であるような状態が変わったことを通知するコンディション変数として使えます。
Broadcast関数でファンアウトを実現する方法が一般的です。
channelでもクローズすれば全てのgoroutineに完了通知することが可能ですが、クローズは一度だけしかできないですし、channelへの書き込み処理はできなくなります。

func main() {
	var wg sync.WaitGroup
	var mu sync.Mutex
	cond := sync.NewCond(&mu)

	for _, sport := range []string{"Basketball", "Baseball", "Football"} {
		wg.Add(1)
		go func(sport string) {
			defer wg.Done()
			mu.Lock()
			defer mu.Unlock()
			cond.Wait() // 完了通知されるまで待つ
			fmt.Println(sport)
		}(sport)
	}

	// 事前にやっておきたい処理
	fmt.Println("My favorite sports: ")

	// 事前処理完了通知。待っていた他の処理を開始させる。
	cond.Broadcast()

	wg.Wait()
}
実行結果
My favorite sports:
Football
Basketball
Baseball

Goでの並行処理の実装パターン

並行処理の実装パターン化をまとめている記事がありましたので、掲載させていただきます。

  • Goにおける並行・並列処理のパターン集
  • WEB+DB PRESS Vol.95
    • バッファ付きchannelを使って同時に実行されるgoroutineの数を制限しよう(セマフォ)
    • channelを利用して指定した数のワーカにデータの排他制御無しでファンアウトする
    • channelを利用して排他制御無しで連番を扱う
    • etc

まとめ

  • go文で簡単にgoroutineを実装できる
  • time.SleepでなくWaitGroupを使う
  • channelかMutexかはケースバイケースだが初心者はとりあえずchannelに走る
  • select構文でchannelの場合分け
  • 並行処理はやはり複雑。用法用量を守らないと変に複雑なコードになりがち。
67
54
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
67
54