3
1

More than 1 year has passed since last update.

Go言語入門 学習メモ 11 ゴールーチン、チャネル、select、

Last updated at Posted at 2022-10-24

はじめに

船井総研デジタルのoswです。業務でGo言語を使うことになったのでこれから学習していきます。その備忘録です。参考になる方がいらっしゃれば幸いです。

対象読者

  • これからGo言語を学習する方
  • 既に他の言語で基本構文を学習されている方

学習環境

学習環境は次のようになっています。この環境の構築メモは下記記事にまとめてあります。ご興味がある方はご参照ください。

  • Windows 11 Home / 22H2
  • VSCode / 1.72.2
  • go version go1.19.2 windows/amd64
  • git version 2.38.0.windows.1

前回までの学習

前回は io.Readerインタフェース を学習しました。

ゴールーチン

Goのランタイムで管理される並行処理を行うための軽量スレッド1で、以下の構文を実行するだけで並行処理が可能です。メモリ空間は共有されているため同じ領域にアクセスする場合は整合性を保つために同期が必要です。ちなみにmain関数もゴールーチンで動いてるようです。

Goの開発者であるRob Pike氏は次のコメントをしています。資料もご覧になれますのでぜひご参照ください。

「並列」2ではなく、「並行」3であって、ゴールーチンとチャネルのデモなくしてGoの紹介はできない、ということでゴールーチンこそがGoのポイントということのようです。

If there’s one thing most people know about Go, is that it is designed for concurrency. No introduction to Go is complete without a demonstration of its goroutines and channels.
But when people hear the word concurrency they often think of parallelism, a related but quite distinct concept.

構文
go 関数名(引数1, 引数2, ...)

ゴールーチンのサンプルを検索してみると、だいたいスリープ処理が入っています。これは、main関数が終了してしまうと新たに起動したゴールーチンも終了してしまうことに起因しているようです。

main関数が終了すると起動したゴールーチンの処理が一切走らずに「ん?」と挙動がよくわからい場合があります。これはmainのゴールーチン、起動したゴールーチン、どれが先行するかわからないため、mainが先行し、先に終わってしまうと起こります。

そのため、多くのサンプルでmain関数にスリープ処理を入れている、という訳のようです。納得です。ただ、実際にはスリープ処理は行わず、データの同期には次のセクションで学ぶ「チャネル」で行うようです。

次のサンプルでは同じ変数への読み書きが確認できます。ですが、お互いが好き勝手に変数を書き換えているため、毎回結果が異なってしまい整合性が取れなくなっています。

サンプルコード
package main

import (
	"fmt"
	"time"
)

const NumLoop = 5

func main() {

	var doNotTouchMe int

	// ゴールーチン1:変数書き換え +1
	go func ()  {
		fmt.Println("goRoutine1: START")
		for i := 0; i < NumLoop; i++ {
			// ゴールーチン2がまだ書き込みしてない可能性がある
			doNotTouchMe++
			fmt.Println("goRoutine1: doNotTouchMe:", doNotTouchMe)
			time.Sleep(100 * time.Microsecond)
		}
		fmt.Println("goRoutine: END")
	}()

	// ゴールーチン2:変数書き換え *2
	go func ()  {
		fmt.Println("goRoutine2: START")
		for i := 0; i < NumLoop; i++ {
			// ゴールーチン1がまだ書き込みしてない可能性がある
			doNotTouchMe *= 2
			fmt.Println("goRoutine2: doNotTouchMe:", doNotTouchMe)
			time.Sleep(100 * time.Microsecond)
		}
		fmt.Println("goRoutine: END")
	}()
	
	// main関数が終了しないよう、スリープさせる
	time.Sleep(time.Second)
	fmt.Println("result:", doNotTouchMe)
}
実行結果
goRoutine2: START
goRoutine2: doNotTouchMe: 0
goRoutine1: START
goRoutine1: doNotTouchMe: 1
goRoutine2: doNotTouchMe: 2
goRoutine2: doNotTouchMe: 4
goRoutine1: doNotTouchMe: 5
goRoutine1: doNotTouchMe: 6
goRoutine2: doNotTouchMe: 12
goRoutine2: doNotTouchMe: 24
goRoutine1: doNotTouchMe: 25
goRoutine1: doNotTouchMe: 26
goRoutine: END
goRoutine: END
result: 26

チャネル

チャネルとは

ゴールーチン間でデータのやり取りが発生する場合、同期を取らないとデータの整合性が取れなくなります。それを解決するのがチャネル、ということのようです。

チャネルはデータ型の1つとして提供されているようで、チャネル型の変数を宣言して使います。アクセスは次の表のようにチャネル演算子を用います。

送信する際は <- を直感的に使えますが、受信の場合は = で変数へ代入する必要があるので注意です。(逆方向の -> はないようです)

チャネル演算子 チャネルからデータ受信 チャネルへデータ送信
<- 受信用変数 = <-チャネル チャネル <- 送信用変数

これは次のようにイメージするとわかりやすいです。チャネルはゴールーチンを繋ぐトンネルのようなもので、そこを通すとお互いにデータの送受信ができます。

チャネルのデータ送受信.png

データをすべて送信し、チャネルを閉じるときは close(チャネル) を使います4。また、close()はデータの送信側が行います。受信側がチャネルを閉じてしまい、閉じたチャネルに対し送信処理を行うとpanic5を起こすようです。

構文
// 型にデータ型を指定すると、そのデータ型の送受信が可能なチャネルが作成される
ch := make(chan )

// データの送信
ch <- data

// データの受信
// チャネルを開いてる時: 受信データがあればokにtrue、データがなければブロック
// チャネルを閉じた時: 受信データがないならokにfalse
value, ok := <-ch

// チャネルを閉じる
close(ch)

チャネルを開いている間に送信されたデータはチャネルを閉じても受信可能なようです。そのため、データ受信時にチャネルが閉じているからといってfalseが返る訳ではありません。また、構文の データの受信 にブロックとありますが、詳細は次のチャネルのブロックで説明します。

次のサンプルコードは先程のサンプルをチャネルを使って書き換えたものです。goRoutineX()にint型のチャネルを渡し、それを通して加工したデータを送受信しています。

前回と異なり、常に同じ結果になり、同期が取れていることが確認できます。

サンプルコード
package main

import (
	"fmt"
	"time"
)

const NumLoop = 3

// ゴールーチン1:変数書き換え +1
func goRoutine1(ch chan int) {
	fmt.Println("goRoutine1: START")
	var bk int

	for i := 0; i < NumLoop; i++ {
		v := bk
		v++
		fmt.Printf("goRoutine1: 送信 %d\n", v)
		ch <- v

		v, ok := <-ch
		if ok {
			bk = v
			fmt.Printf("goRoutine1: 受信 %d\n", v)
		}
	}
	fmt.Println("goRoutine: END")
}

// ゴールーチン2:変数書き換え *2
func goRoutine2(ch chan int) {
	fmt.Println("goRoutine2: START")
	for i := 0; i < NumLoop; i++ {
		v, ok := <-ch
		if ok {
			fmt.Printf("goRoutine2: 受信 %d\n", v)
		}

		v *= 2
		fmt.Printf("goRoutine2: 送信 %d\n", v)
		ch <- v
	}
	fmt.Println("goRoutine: END")
}

func main() {

	ch := make(chan int)

	go goRoutine1(ch)
	go goRoutine2(ch)

	// main関数が終了しないよう、スリープさせる
	time.Sleep(time.Second)
}
実行結果
goRoutine2: START
goRoutine1: START
goRoutine1: 送信 1
goRoutine2: 受信 1
goRoutine2: 送信 2
goRoutine1: 受信 2
goRoutine1: 送信 3
goRoutine2: 受信 3
goRoutine2: 送信 6
goRoutine1: 受信 6
goRoutine1: 送信 7
goRoutine2: 受信 7
goRoutine2: 送信 14
goRoutine: END
goRoutine1: 受信 14
goRoutine: END

チャネルのブロック

チャネルとはで同期が取れていることを確認しましたが、これはチャネルが自動的にブロック6することで実現しているようです。

ch <- Data でデータ送信時、送信側はここでブロックします。ブロックが解除されるのは Data = <-ch で誰かが送信データを受信してくれた時。

逆にデータ受信側は Data = <-ch でブロックし、誰かが ch <- Data でデータを送信してくれた時に解除されます。

チャネルのバッファ

チャネルはバッファを持っており、そのサイズによってブロックするタイミングが異なるようです。バッファサイズが0の場合、即時実行で動くため誰かが受信してくれる、または送信してくれるまでブロックします。

送信側はサイズが設定されていればそのサイズ分までは送信可能となり、サイズを超えたらバッファに空きが出る(誰かが受信する)までブロックします。

受信側はバッファにデータがあればそのまま処理を続行し、データが空になったら誰かが送信してくれるまでブロックします。

バッファサイズはチャネル作成時にmake()の引数で指定します。サイズを指定しない場合は初期値の0が設定されるようです。そのため、上のサンプルコードはその場でブロックされる、という動きになっていました。

構文
// チャネルをバッファサイズを指定して作成
make(chan , バッファサイズ)

// バッファサイズを指定しない場合、サイズは0になる。
make(chan )

次のサンプルコードはバッファサイズ2に指定したチャネルを作り、データ送信ゴールーチンを走らせ、main関数でデータを受信します。

指定したバッファサイズと同じ2回目まで送信され、3回目の送信処理がブロックされるのか確認します。

サンプルコード
package main

import (
	"fmt"
	"time"
)

func goRoutine(ch chan int) {
    // スリープさせずに一気にデータ送信
	for i := 0; i < 4; i++ {
		ch <- i + 1
		fmt.Printf("%d回目: 送信データ %d\n", i + 1, i + 1)
	}
	close(ch)
}

func main() {
	// バッファサイズ2のチャネルを作成
	ch := make(chan int, 2)

	// ゴールーチンの起動
	go goRoutine(ch)

	for {
		// 送信された直後に受信させないようスリープ
		time.Sleep(2 * time.Second)

		// データ受信
		if data, ok := <-ch; ok {
			fmt.Printf("受信データ: %d\n", data)
		} else {
            // チャネルが閉じられた、かつ受信データがない
			break
		}
	}
}

実行結果は下記のようになりました。2回目の送信処理までは実行され、3回目でブロック。受信側がデータを受け取ったら送信処理が再開されるため、きちんとブロックされていることが確認できました。

実行結果
1回目: 送信データ 1
2回目: 送信データ 2
受信データ: 1
3回目: 送信データ 3
受信データ: 2
4回目: 送信データ 4
受信データ: 3
受信データ: 4

チャネルに対して for .. range

今までのサンプルコードはデータ受信時にokを確認してifで分岐をさせていました。for .. rangeを使うとこの手間が省けるようです。

チャネルを閉じた、かつ受信データがないならループが終了します。

構文
for データ格納用変数 := range チャネル {
    // 処理
}

次のサンプルではデータを送信後、すぐにチャネルを閉じるゴールーチンを走らせます。意図した通り、チャネルを閉じた、かつ受信データがなくなった時点でループを抜けています。

サンプルコード
package main

import (
	"fmt"
)

const bufSize = 5

func goRoutine(ch chan int)  {
	for i := 0; i < bufSize; i++ {
		ch <- i
	}
	close(ch)
}

func main() {

	ch := make(chan int, bufSize)

	go goRoutine(ch)

	// チャネルを閉じる、かつ受信データがないなら終了
	for data := range ch {
		fmt.Println("data:", data)
	}
}
実行結果
data: 0
data: 1
data: 2
data: 3
data: 4

select

次のように複数のチャネルでデータをやりとりする場合、片方がブロックしてしまうともう片方も処理が止まります。

片方がブロックすると処理が進まない
data1 := <-ch1 // ブロック
data2 := <-ch2 // データはあるがch1がブロックしているため動けない

それを解決するために select が用意されているようです。selectはデータの送受信の準備ができたチャネルを処理し、ブロック状態のチャネルはそのままブロックしたままにします。(複数のチャネルが準備完了なら処理順はランダムの様子)

Linuxのシステムコールで同様の動きをするselectがあると思います。それと同じ感じのようです。以前ソケットプログラミングを学習したことがあるのですが、それでselectを学習しました。

構文
select {
case <-チャネル1:
    // 処理1
case <-チャネル2:
    // 処理2
default:
    // どのチャネルも準備が完了していない
}

次のサンプルではバッファサイズが1のチャネルを2つ作成し、それぞれに対し2つのゴールーチンからデータを送信します。

片方のゴールーチンではスリープを長めに取っていますが、それがボトルネックになることもなく、準備できた方から受信データを表示できていることが確認できます。

サンプルコード
package main

import (
	"fmt"
	"time"
)

func goRoutine1(ch chan int) {
	var i int
	for {
		i++
		time.Sleep(5 * time.Second)
		ch <- i
	}
}

func goRoutine2(ch chan int) {
	var i int
	for {
		i++
		time.Sleep(1 * time.Second)
		ch <- i
	}
}

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)

	go goRoutine1(ch1)
	go goRoutine2(ch2)

	for {
		select {
		case data, ok := <-ch1:
			if ok {
				fmt.Println("ch1 data:", data)
			}

		case data, ok := <-ch2:
			if ok {
				fmt.Println("ch2 data:", data)
			}
		}
	}
}
実行結果
ch2 data: 1
ch2 data: 2
ch2 data: 3
ch2 data: 4
ch1 data: 1
ch2 data: 5
ch2 data: 6
ch2 data: 7
ch2 data: 8
ch2 data: 9
ch1 data: 2
ch2 data: 10
ch2 data: 11
ch2 data: 12
ch2 data: 13
[Ctrl-c]

おわりに

ひとまずGoの学習はここまでです。

  1. 厳密にはスレッドのようなもの。別々のCPUコアで動くとは限らない

  2. 並列は同じタスクを複数こなす、ということのようです。

  3. 並行は異なるタスクを複数こなす、ということのようです。

  4. ただし、通常は閉じる必要はないとのこと。受信側にもうデータを送信しないと伝えたい場合などに閉じるようです。

  5. 処理を続行できず強制終了される状態。ランタイムエラー

  6. 処理がソースコードの次の行に移らず、そこで待ち状態になる

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1