LoginSignup
0
0

More than 3 years have passed since last update.

GoでGeneratorとWorkerのサンプル

Posted at

大量のJobを捌くこんなケースのGoのサンプルです。

  • (Jobの)Generator
    • 生成処理が軽くて1つ
    • 途中でコケたら全体を中断したい(それ以上Jobを生成しない)
  • Worker
    • それなりに重たい処理
    • 横に複数並べてJobを捌きたい
    • エラーが発生しても次のJobを継続(最後にエラーをハンドルする)

処理結果とエラーを持つResult構造体

// packageとimportは割愛

type Result struct {
    s string
    e error
}

GeneratorはJobの数だけ値を入れてくチャネルを作って渡します。
時々コケると、途中で止めます。

func Generator(num int) <-chan int {
    ch := make(chan int)
    go func() {
        i := 0
        defer close(ch)

        for num > 0 {
            num--

            // sometime happen error, quit to generate
            if rand.Int()%10 == 0 {
                log.Printf("quit to generate\n")
                return
            }

            ch <- i
            log.Printf("gen: %d\n", i)
            i++
        }
    }()
    return ch
}

workerは入力と出力のチャネルを受けます。
Jobの実行処理の成否をResultとして詰めていきます。
上位のWaitGroupを受けてDoneするのはお決まりのパターンですね。

func worker(wg *sync.WaitGroup, chIn <-chan int, chResult chan<- *Result) {
    defer wg.Done()

    for {
        i, ok := <-chIn
        if !ok {
            return
        }
        log.Printf("work: %d\n", i)

        time.Sleep(2 * time.Second)

        // sometime happen error, continue to work
        if rand.Int()%10 == 0 {
            log.Printf("continue to work\n")
            chResult <- &Result{
                s: strconv.Itoa(i),
                e: fmt.Errorf("error %d", i),
            }
            continue
        }

        chResult <- &Result{
            s: strconv.Itoa(i),
        }
    }
}

本体は上記のGeneratorとworkerとチャネルを取り持つだけです。
Resultの順番はバラけてしまうので、私は配列に詰め直してソートしたりしました。

func main() {
    log.Printf("---start\n")
    rand.Seed(time.Now().UnixNano())

    job := 10
    chIn := Generator(job)
    chResult := make(chan *Result, job)

    var wg sync.WaitGroup
    concurrency := 3
    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go worker(&wg, chIn, chResult)
    }
    wg.Wait()
    close(chResult)
    log.Printf("---worker done\n")

    for r := range chResult {
        if r.e == nil {
            log.Printf("result %s ok\n", r.s)
        } else {
            log.Printf("result %s err: %s\n", r.s, r.e)
        }
    }
}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0