大量のJobを捌くこんなケースのGoのサンプルです。
- (Jobの)Generator
- 生成処理が軽くて1つ
- 途中でコケたら全体を中断したい(それ以上Jobを生成しない)
- Worker
- それなりに重たい処理
- 横に複数並べてJobを捌きたい
- エラーが発生しても次のJobを継続(最後にエラーをハンドルする)
処理結果とエラーを持つResult構造体
// packageとimportは割愛
type Result struct {
s string
e error
}
GeneratorはJobの数だけ値を入れてくチャネルを作って渡します。
時々コケると、途中で止めます。
func Generator(num int) <-chan int {
ch := make(chan int)
go func() {
i := 0
defer close(ch)
for num > 0 {
num--
// sometime happen error, quit to generate
if rand.Int()%10 == 0 {
log.Printf("quit to generate\n")
return
}
ch <- i
log.Printf("gen: %d\n", i)
i++
}
}()
return ch
}
workerは入力と出力のチャネルを受けます。
Jobの実行処理の成否をResultとして詰めていきます。
上位のWaitGroupを受けてDoneするのはお決まりのパターンですね。
func worker(wg *sync.WaitGroup, chIn <-chan int, chResult chan<- *Result) {
defer wg.Done()
for {
i, ok := <-chIn
if !ok {
return
}
log.Printf("work: %d\n", i)
time.Sleep(2 * time.Second)
// sometime happen error, continue to work
if rand.Int()%10 == 0 {
log.Printf("continue to work\n")
chResult <- &Result{
s: strconv.Itoa(i),
e: fmt.Errorf("error %d", i),
}
continue
}
chResult <- &Result{
s: strconv.Itoa(i),
}
}
}
本体は上記のGeneratorとworkerとチャネルを取り持つだけです。
Resultの順番はバラけてしまうので、私は配列に詰め直してソートしたりしました。
func main() {
log.Printf("---start\n")
rand.Seed(time.Now().UnixNano())
job := 10
chIn := Generator(job)
chResult := make(chan *Result, job)
var wg sync.WaitGroup
concurrency := 3
for i := 0; i < concurrency; i++ {
wg.Add(1)
go worker(&wg, chIn, chResult)
}
wg.Wait()
close(chResult)
log.Printf("---worker done\n")
for r := range chResult {
if r.e == nil {
log.Printf("result %s ok\n", r.s)
} else {
log.Printf("result %s err: %s\n", r.s, r.e)
}
}
}