package main
import (
"context"
"fmt"
"sync"
"time"
)
const bufSize = 5
func main() {
ch1 := make(chan int, bufSize)
ch2 := make(chan int, bufSize)
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Millisecond)
defer cancel()
wg.Add(3)
go countProducer(&wg, ch1, bufSize, 50)
go countProducer(&wg, ch2, bufSize, 500)
go countConsumer(ctx, &wg, ch1, ch2)
wg.Wait()
fmt.Println("finish")
}
func countProducer(wg *sync.WaitGroup, ch chan<- int, size int, sleep int) {
defer wg.Done()
defer close(ch)
for i := 0; i < size; i++ {
time.Sleep(time.Duration(sleep) * time.Millisecond)
ch <- i
}
}
func countConsumer(ctx context.Context, wg *sync.WaitGroup, ch1 <-chan int, ch2 <-chan int) {
defer wg.Done()
// loop:
for ch1 != nil || ch2 != nil {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
// break loop
for ch1 != nil || ch2 != nil {
select {
case v, ok := <-ch1:
if !ok {
ch1 = nil
break
}
fmt.Printf("ch1 %v\n", v)
case v, ok := <-ch2:
if !ok {
ch2 = nil
break
}
fmt.Printf("ch2 %v\n", v)
}
}
case v, ok := <-ch1:
if !ok {
ch1 = nil
break
}
fmt.Printf("ch1 %v\n", v)
case v, ok := <-ch2:
if !ok {
ch2 = nil
break
}
fmt.Printf("ch2 %v\n", v)
}
}
}
ゴルーチンとチャネルは、Go言語における並行処理の中心的な要素です。このプログラムでは、以下のようにそれらが処理に関与しています。
ゴルーチンの処理
-
プロデューサーゴルーチン:
-
countProducer
関数が2つのゴルーチンとして起動されます。 - 各ゴルーチンは、指定されたスリープ時間ごとに整数を生成し、対応するチャネル(
ch1
またはch2
)に送信します。 - 各プロデューサーは独立して動作し、並行して整数を生成するため、全体の処理速度が向上します。
-
-
コンシューマーゴルーチン:
-
countConsumer
関数もゴルーチンとして起動されます。 - このゴルーチンは、両方のチャネルからデータを受信し、その値を表示します。
- コンシューマーは、プロデューサーから送信されたデータを非同期に受け取るため、データを効率的に処理できます。
-
チャネルの処理
-
データの送信:
- プロデューサーは、
ch <- i
という構文を使って整数をチャネルに送信します。この操作はブロッキングされることがあります。つまり、チャネルのバッファが満杯の場合、プロデューサーは空きができるまで待機します。 - これにより、プロデューサーの生成速度がコンシューマーの消費速度に合わせて調整され、データの過剰生成を防ぎます。
- プロデューサーは、
-
データの受信:
- コンシューマーは、
v, ok := <-ch1
やv, ok := <-ch2
を使用して、チャネルからデータを受信します。 - チャネルが閉じられた場合、
ok
がfalse
になるため、受信側はそれを検知して、ループを終了することができます。
- コンシューマーは、
-
チャネルのクローズ:
- プロデューサーゴルーチンが終了する際に、
defer close(ch)
を使ってチャネルをクローズします。これにより、他のゴルーチン(この場合はコンシューマー)がチャネルの終了を認識できるようになります。
- プロデューサーゴルーチンが終了する際に、
並行処理の流れ
-
プロデューサーの動作:
- プロデューサーは、指定されたスリープ時間を待った後、整数をチャネルに送信します。このプロセスは非同期で行われ、他のプロデューサーやコンシューマーと同時に実行されます。
-
コンシューマーの動作:
- コンシューマーは、
select
文を使用して、両方のチャネルからのデータ受信を待ちます。これにより、どちらのチャネルからでもデータを受け取ることができ、効率的に処理できます。 - コンシューマーは、受信したデータを即座に表示しますが、データがない場合はブロックされます。
- コンシューマーは、
-
タイムアウトとキャンセル:
- コンシューマーは、与えられたコンテキスト(180ミリ秒のタイムアウト)を利用して、処理を制御します。タイムアウトが発生すると、コンシューマーは受信を中止し、エラーを表示します。
定数とmain
関数
const bufSize = 5
-
bufSize
はチャネルのバッファのサイズを定義します。ここでは5つの整数をバッファリングできます。
func main() {
- プログラムのエントリーポイントである
main
関数の開始です。
チャネルとWaitGroupの定義
ch1 := make(chan int, bufSize)
ch2 := make(chan int, bufSize)
- 整数を送受信するための2つのチャネル
ch1
とch2
を作成します。どちらのチャネルもバッファサイズは5です。
var wg sync.WaitGroup
-
WaitGroup
を宣言します。これにより、複数のゴルーチンの完了を待つことができます。
コンテキストの設定
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Millisecond)
defer cancel()
- タイムアウトを持つコンテキストを作成します。180ミリ秒後に自動的にキャンセルされます。
defer cancel()
により、main
関数が終了する際にリソースが解放されます。
ゴルーチンの起動
wg.Add(3)
-
WaitGroup
に3つのゴルーチンを追加します。これは、2つのプロデューサーと1つのコンシューマーを表します。
go countProducer(&wg, ch1, bufSize, 50)
go countProducer(&wg, ch2, bufSize, 500)
-
countProducer
関数を2つのゴルーチンとして起動します。各プロデューサーは異なるスリープ時間(50msと500ms)で動作します。
go countConsumer(ctx, &wg, ch1, ch2)
- コンシューマーゴルーチンを起動します。このゴルーチンは、
ch1
とch2
からデータを受け取ります。
ゴルーチンの完了待ち
wg.Wait()
- すべてのゴルーチンが終了するまで待機します。これにより、メインスレッドは他のゴルーチンが完了するまでブロックされます。
fmt.Println("finish")
- すべての処理が完了した後にメッセージを表示します。
countProducer
関数
func countProducer(wg *sync.WaitGroup, ch chan<- int, size int, sleep int) {
defer wg.Done()
defer close(ch)
-
WaitGroup
のカウントを減らすためにdefer wg.Done()
を使用し、関数が終了するときにチャネルを閉じるためにdefer close(ch)
を使用します。
ループと送信
for i := 0; i < size; i++ {
time.Sleep(time.Duration(sleep) * time.Millisecond)
ch <- i
}
- ループを使って指定されたサイズまで整数を生成します。
time.Sleep
を使って、指定された時間だけスリープし、その後チャネルに整数を送信します。これにより、プロデューサーがデータを生成するペースを制御できます。
countConsumer
関数
func countConsumer(ctx context.Context, wg *sync.WaitGroup, ch1 <-chan int, ch2 <-chan int) {
defer wg.Done()
- コンシューマーゴルーチンの開始です。
WaitGroup
のカウントを減らします。
ループでの受信
for ch1 != nil || ch2 != nil {
- いずれかのチャネルがまだ開いている間、ループを続けます。
select {
- 複数のチャネルからの受信を待つための
select
文を使用します。
コンテキストのタイムアウト処理
case <-ctx.Done():
fmt.Println(ctx.Err())
- コンテキストがタイムアウトした場合、エラーメッセージを表示します。これにより、処理がタイムアウトしたことがわかります。
チャネルからの受信
for ch1 != nil || ch2 != nil {
- タイムアウト後もチャネルからの受信を試みます。
case v, ok := <-ch1:
-
ch1
からの受信を試み、成功した場合はその値を表示します。ok
は受信が成功したかどうかを示します。
case v, ok := <-ch2:
- 同様に、
ch2
からの受信を試みます。
ゴルーチンの影響
-
軽量性: ゴルーチンはスレッドよりも軽量で、数千から数万のゴルーチンを同時に実行できます。これにより、リソースの効率的な使用が可能です。
-
非同期処理: プロデューサーがデータを生成している間、コンシューマーはそれを消費できるため、全体の処理効率が向上します。
-
チャネルによる通信: チャネルを使うことで、ゴルーチン間で安全にデータを送受信できます。これにより、データの整合性が保たれやすくなります。
-
同期処理:
WaitGroup
を使うことで、すべてのゴルーチンが完了するのを簡単に待つことができ、メインスレッドの終了を適切に管理できます。