47
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Makuake Product TeamAdvent Calendar 2018

Day 14

goroutine と channel で始める非同期処理

Last updated at Posted at 2018-12-14

はじめに

こんばんは。 @convto と申します。よろしくおねがいします。
先日AmazonのサイバーマンデーでルンバやらEchoやらいろいろ買って文明に恐れおののいています。
今回は業務委託として関わらせていただいている Makuake さんのアドベントカレンダーに参加させていただきました。

これは Makuake Product Team Advent Calendar 2018 の 14日目 の投稿です。
昨日は @shigeru10KPTを活用した振り返り in Makuake でした。こちらもぜひご一読ください!

それではやって行きましょう。今回はchannelとgoroutineについてです。

goroutine について

処理を仮想スレッドで起動する機能です。
作業員をふやして仕事を分担するイメージが近いです。

go キーワードを利用して関数を呼ぶとgoroutineとして起動します。

func main() {
	go task()
}

func task() {
	// some task
}

この際、goroutineで起動したい関数に戻り値を設定することができません。
仮に戻り値が設定できるとすると、このようなコードが書けるのですが、、、

func main() {
	// シンタックスエラーが発生する
	// 本来は戻り値は指定できないが、もし可能だとすると
	s := go task()
}

func task() string {
	// some task
	return "task done."
}

このように、戻り値を受け取るスタイルにしてしまうと

s := go task() の部分で s を受け取るために task の結果を待たなければならず、非同期に処理することができません。
そのため、goroutineで起動したい関数には戻り値を設定することができないようになっています。

また、goroutine は処理が完了されると破棄されます。
完了した処理に対して無理にスレッドを維持したりすることはありません。
その都度破棄して、リソースを開けます。

goroutine を増やすのは簡単です。
go キーワードで関数をたくさん読んであげるだけです。

func main() {
	// 10回非同期でtaskを呼び出す
	for i := 0; i < 10; i++ {
		go task()
	}
}

func task() {
	// some task
}

これで並列して処理が実行できます。
すごく便利そうですね。

プロセス間の情報の不透明性

goroutine がうまく非同期処理してマシンリソースを効率よく使えそうなことはわかりました。
しかし、ただこれだけの記述だと、問題があります。

goroutineには

  • 戻り値がない
  • 処理が完了されると破棄される

という特徴があることがわかりました。

このままだと goroutine が「作業しているのか破棄されたのか」を知る方法がありません。
また、goroutineがいつ終わるのか(またはすでに終わっているのか)がわからないため、goroutineの作業を待つこともできません。
たとえば先ほどの10回goroutineを起動するコードを以下のように更新してみると、それがわかります。

func main() {
	// 10回非同期でtaskを呼び出す
	for i := 0; i < 10; i++ {
		go task()
	}
}

func task() {
	// some task
	fmt.Println("I'm goroutine.")
}

このコードを実行してみると、(おそらく多くの環境では1) I'm goroutine. が一度も出力されないと思います。
これは、mainスレッドがtaskの処理を待つことができないため、for文を回してgoroutineを起動するだけで終了してしまうためです。

また、なにかのミスでgoroutineの作業が止まっていたとして、それを検知することはできません。

func main() {
	rand.Seed(time.Now().UnixNano())
	for i := 0; i < 10; i++ {
		// 0または1をランダムで生成する
		go task(rand.Intn(2))
	}
}

func task(random int) {
	// some task
	switch random {
	case 0:
		// 失敗する
	case 1:
		// 成功する
	}
}

検知することができないということは、意図しない挙動が発生した時にエラーハンドリングが不可能ということです。
また、戻り値が無いことで作業の結果として加工した値などを受けることもできません。

これらのことより、このままだとgoroutineの処理は、戻り値を必要としない(≒エラーハンドリングの必要のない)ものしか不可能になります。

それだとすごく使いづらそうですね。
安心してください。channelという機能が提供されています。

channel について

channel とは、プロセス間で値をやり取りするための機能です。
チャネルを利用すると、

  • goroutine との値をやり取り
  • 「すれ違い」をなくすための待ち合わせ

などが可能になります!

チャネルはgoroutine間の手紙のようなもので、goroutineをまたいで値の書き込みや受け取りができます。
それぞれ書き込みはこのように書きます

ch<- val

受け取りはこのように書きます

val <-ch

矢印が少しわかりづらいですね。整理しましょう
channelを手紙に見立てると
68747470733a2f2f696d672e6573612e696f2f75706c6f6164732f70726f64756374696f6e2f6174746163686d656e74732f373432342f323031382f31322f31342f33393438322f61366665383530652d636463652d343663342d393432332d3766666164386538393166342e706e67.png

図のように、右からくるものが送信
左に出て行くのが受信です。

channelの受信を記述すると、値が送信されるまで待ってくれます。
そのままではすれ違ってしまうコードも、このように書くことができるようになります。

func main() {
	// チャネルを作成
	txtCh := make(chan string)
	// 使い終わったらclose
	defer close(txtCh)
	// goroutineにチャネルを渡す
	go task(txtCh)
	// チャネルを受信する
	// その際、代入も可能
	s := <-txtCh
	fmt.Println(s)
}

// チャネルを書き込み専用で受け取る
func task(txtCh chan<- string) {
	// チャネルに結果を書き込む
	txtCh <- "I'm goroutine"
}

チャネルはバッファを持たせることもできます。
作成時にバッファを指定すると、チャネルが持てるデータの個数が決まります。

デフォルトは0(逐一手渡ししないとダメ、貯めておけない)です。
バッファを超えて値を送信すると、送信ブロッキングが発生します。

func main() {
	// チャネルを作成
	// デフォルトのバッファは0(逐一手渡し)
	txtCh := make(chan string)
	defer close(txtCh)
	// task を起動
	go task(txtCh)
	// 値を送信
	txtCh <- "push txt1."
	// 受信される前にもう一度送信
	// 送信ブロッキング発生
	// これ以降受け取り手がいなく無限に待つことになるのでdeadlock発生
	txtCh <- "push txt2."
}

// チャネルを受信専用で受け取る
func task(txtCh <-chan string) {
	// 1秒待つ
	time.Sleep(1 * time.Second)
	// チャネルから値を受け取る
	<-txtCh
}

このようにバッファを設定すると、その分だけ値を保持することができます。

func main() {
	// チャネルを作成
	// バッファを10まで設定
	txtCh := make(chan string, 10)
	defer close(txtCh)
	// task を起動
	go task(txtCh)
	// txtChに値を10個送信
	// バッファに収まっているのでblockingが発生しない
	for i := 0; i < 10; i++ {
		txtCh <- fmt.Sprintf("push txt%d.", i)
	}

	// チャネルへの送信がブロックされないので
	// goroutineの結果を待たずに処理が終了する
}

// チャネルを受信専用で受け取る
func task(txtCh <-chan string) {
	// 1秒待つ
	time.Sleep(1 * time.Second)
	// チャネルから値を受け取る
	<-txtCh
}

送信のブロッキングとバッファを利用して、
「あるレンジアクセス処理のワーカー数を指定する」ようなこともできます。

func main() {
	// バッファ6でスレッド数を管理するチャネルを作成
	limitCh := make(chan struct{}, 6)
	defer close(limitCh)
	// 結果を受け取るチャネルを作成
	resCh := make(chan string)
	defer close(resCh)
	// 100個の要素を持ったslice
	s := []int{
		100,
		200,
		300,
		400,
		// ...省略
	}

	// 要素数を覚えておく
	count := 0
	for _, n := range s {
		// taskが内部でlimitChの受送信を行う
		// 6ループするとバッファが詰まって送信ブロッキング発生
		// goroutineが解放されるとlimitChのバッファに空きがでるので送信ブロッキングが解除される
		// つまり同時に6個までしかgoroutineが起動できない
		go task(n, limitCh, resCh)
		count++
	}

	// 取り出す
	// resChとcountをセットで返す関数にして、他の場所で取り出すことも可能
	for i := 0; i < count; i++ {
		fmt.Println(<-resCh)
	}
}

func task(n int, limitCh chan struct{}, resCh chan<- string) {
	// limitChを送信する(バッファを1つ確保する)
	limitCh <- struct{}{}
	// 作業をする
	resCh <- fmt.Sprintf("message %d", n)
	// リミットを受信する(バッファを1つ解放する)
	<-limitCh
}

channel + select

channel は、select を利用することで柔軟に受送信できます。
基本的にはこのようにかけます。

select {
case <-ch1:
	// ch1 のパターン
}

複数パターンも受け取れます。
selectはどれかの条件に合致するまで処理がされません。
どれかの条件に合致するとselectを抜けます。

select {
case <-ch1:
	// ch1 のパターン
case <-ch2:
	// ch2 のパターン
}

つまりcaseで複数のチャネルを受信すると、どれか一つのチャネルを受け取るまで処理をブロッキングします。

値の代入も行うことができます。

select {
case s := <-ch1:
	fmt.Println(s)
case n := <-ch2:
	fmt.Println(n)
}

書き込みでselectを使うこともできます。
書き込みの場合は一番最初にバッファが空いたチャネルに値が送信されてselectを抜けます。

select {
case ch1 <- s:
	// ch1 のパターン
case ch2 <- s:
	// ch2 のパターン
}

ブロッキングしてほしくない場合、defaultを設定してあげます。
デフォルトはどんな条件でもマッチするので、どんなタイミングでも通過します。
無限ループなどと組み合わせて使う場合が多いです。

// 基本的には何もせずに、ch1 のチャネルに値が来るのを待ち続ける
for {
	select {
	case ch1 <- 1:
		// ch1 のパターン
		// ch1 を受け取った時のみ以下の動作を行う
	default:
		// デフォルトのパターン
	}
}

(ex, 標準パッケージでの goroutine の管理

goroutineを管理することができるパッケージとして、以下のようなパッケージがあります

  • context: キャンセル処理や値の伝搬
  • sync: 非同期処理で必要になる様々な機能を提供
  • sync/atomic: 低レベル同期処理のためのパッケージ。基本channelとsyncで事足りるよ、と ドキュメント にある

これらのパッケージは、goroutine間でのキャンセルシグナルを管理してくれたり、
共通して扱う値の競合を管理してくれたりします。

今回は詳しい説明を省きますが、興味のある方はぜひ調べてみてください。
context については、先日 Go の context パッケージの使い方について整理してみた - Qiita という記事にまとめてみたので、そちらもぜひ。

このようにGoは、channel という機能を中心に、goroutineをより管理しやすくしてくれる機能を提供しています。

最後に

Goのある意味一番Goらしい処理は非同期処理です。
非同期処理上でうまくメッセージングを行えるように、 channel という機能があります。
channel は非常によくできており、色々な非同期パターンを実装できます。
そして、その非同期パターンをうまく実装するためのツールが標準パッケージとして提供されています。うれしい!

以上の手厚いサポートにより、Goで非同期処理を書いたり考えたりするのはとても楽しいです。
まずはベーシックな goroutine と channel から、Goの非同期処理に触れ合ってみましょう!

最後になりますが、株式会社 Makuake では一緒に働くメンバーを募集しております!
ご興味が有りましたらご連絡いただけると幸いです!
https://www.wantedly.com/projects/26807

Makuake Product Team Advent Calendar 2018 も残すところあと10日ほどとなりましたが、ぜひ最後までお付き合いください!あしたは @taku_oka です。お楽しみに!

  1. ほぼ確実に表示されませんが、待ち合わせなどの処理を一切していないので環境依存で何か起きるケースも確率的にはあるかもしれません。ほぼ確実に表示されないと思います。

47
24
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
47
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?