Help us understand the problem. What is going on with this article?

GoでGeneratorとWorkerのサンプル

大量のJobを捌くこんなケースのGoのサンプルです。

  • (Jobの)Generator
    • 生成処理が軽くて1つ
    • 途中でコケたら全体を中断したい(それ以上Jobを生成しない)
  • Worker
    • それなりに重たい処理
    • 横に複数並べてJobを捌きたい
    • エラーが発生しても次のJobを継続(最後にエラーをハンドルする)

処理結果とエラーを持つResult構造体

// packageとimportは割愛

type Result struct {
    s string
    e error
}

GeneratorはJobの数だけ値を入れてくチャネルを作って渡します。
時々コケると、途中で止めます。

func Generator(num int) <-chan int {
    ch := make(chan int)
    go func() {
        i := 0
        defer close(ch)

        for num > 0 {
            num--

            // sometime happen error, quit to generate
            if rand.Int()%10 == 0 {
                log.Printf("quit to generate\n")
                return
            }

            ch <- i
            log.Printf("gen: %d\n", i)
            i++
        }
    }()
    return ch
}

workerは入力と出力のチャネルを受けます。
Jobの実行処理の成否をResultとして詰めていきます。
上位のWaitGroupを受けてDoneするのはお決まりのパターンですね。

func worker(wg *sync.WaitGroup, chIn <-chan int, chResult chan<- *Result) {
    defer wg.Done()

    for {
        i, ok := <-chIn
        if !ok {
            return
        }
        log.Printf("work: %d\n", i)

        time.Sleep(2 * time.Second)

        // sometime happen error, continue to work
        if rand.Int()%10 == 0 {
            log.Printf("continue to work\n")
            chResult <- &Result{
                s: strconv.Itoa(i),
                e: fmt.Errorf("error %d", i),
            }
            continue
        }

        chResult <- &Result{
            s: strconv.Itoa(i),
        }
    }
}

本体は上記のGeneratorとworkerとチャネルを取り持つだけです。
Resultの順番はバラけてしまうので、私は配列に詰め直してソートしたりしました。

func main() {
    log.Printf("---start\n")
    rand.Seed(time.Now().UnixNano())

    job := 10
    chIn := Generator(job)
    chResult := make(chan *Result, job)

    var wg sync.WaitGroup
    concurrency := 3
    for i := 0; i < concurrency; i++ {
        wg.Add(1)
        go worker(&wg, chIn, chResult)
    }
    wg.Wait()
    close(chResult)
    log.Printf("---worker done\n")

    for r := range chResult {
        if r.e == nil {
            log.Printf("result %s ok\n", r.s)
        } else {
            log.Printf("result %s err: %s\n", r.s, r.e)
        }
    }
}
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした