LoginSignup
0
1

More than 1 year has passed since last update.

GO言語で並列実行するサンプルコード。ワーカーのキャンセル機能付き

Posted at

個人的な備忘録として。

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/pkg/errors"
)

type Result struct {
	Task Task
	Err  error
}

type Task struct {
	Bar int
}

func worker(workerID int, ctx context.Context, taskChannel <-chan Task, resultChannel chan<- Result) {
    // taskCannelがなくなるまで動作する
	for task := range taskChannel {
		select {
		case <-ctx.Done(): // 親からcancel()が実行された場合、動きを止める
			fmt.Printf("[%d] got Cancel\n", workerID)
			return
		default:
			fmt.Printf("[%d] do task = %v\n", workerID, task)
			time.Sleep(time.Second * 1)
		}
	}
}

func main() {

	// タスクのリスト
	taskList := []Task{{1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}, {12}}
	taskChannel := make(chan Task, len(taskList))
	resultChannel := make(chan Result)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	// タスクチャンネルに入れる
	for _, task := range taskList {
		taskChannel <- task
	}
	close(taskChannel)
	// 複数のワーカーを起動
	for i := 0; i < 4; i++ {
		go worker(i, ctx, taskChannel, resultChannel)
	}
	// 全ての完了を待つ
	var count int
	var err error
	err = nil
	for {
		result := <-resultChannel
		count += 1
		if result.Err != nil { // タスクの一つがエラー
			err = errors.Wrapf(result.Err, "task=%v has error\n", result.Task)
			fmt.Printf("%v\n", err)
			// 他のワーカーを止める
			fmt.Printf("cancel other worker\n")
			cancel()
			break
		} else {
			fmt.Printf("task=%v finish\n", result.Task)
		}
		if count == len(taskList) { // 全てのタスクが完了
			break
		}
	}
	if err != nil {
		panic(err)
	}
}
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1