個人的な備忘録として。
package main
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
)
type Result struct {
Task Task
Err error
}
type Task struct {
Bar int
}
func worker(workerID int, ctx context.Context, taskChannel <-chan Task, resultChannel chan<- Result) {
// taskCannelがなくなるまで動作する
for task := range taskChannel {
select {
case <-ctx.Done(): // 親からcancel()が実行された場合、動きを止める
fmt.Printf("[%d] got Cancel\n", workerID)
return
default:
fmt.Printf("[%d] do task = %v\n", workerID, task)
time.Sleep(time.Second * 1)
}
}
}
func main() {
// タスクのリスト
taskList := []Task{{1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}, {12}}
taskChannel := make(chan Task, len(taskList))
resultChannel := make(chan Result)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// タスクチャンネルに入れる
for _, task := range taskList {
taskChannel <- task
}
close(taskChannel)
// 複数のワーカーを起動
for i := 0; i < 4; i++ {
go worker(i, ctx, taskChannel, resultChannel)
}
// 全ての完了を待つ
var count int
var err error
err = nil
for {
result := <-resultChannel
count += 1
if result.Err != nil { // タスクの一つがエラー
err = errors.Wrapf(result.Err, "task=%v has error\n", result.Task)
fmt.Printf("%v\n", err)
// 他のワーカーを止める
fmt.Printf("cancel other worker\n")
cancel()
break
} else {
fmt.Printf("task=%v finish\n", result.Task)
}
if count == len(taskList) { // 全てのタスクが完了
break
}
}
if err != nil {
panic(err)
}
}