LoginSignup
19
16

More than 5 years have passed since last update.

Go Concurrency Patterns: Pipelines and cancellation を読んだ時のメモ

Last updated at Posted at 2014-09-27

Go Concurrency Patterns: Pipelines and cancellationを読んだときのメモ。

Pipelineパターン

例えば、ある複数の数字をすべて2倍にしたい場合。

http
package main

import "fmt"

func gen(nums ...int) chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()

    return out
}

func sq(cs <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for c := range cs {
            out <- c * c
        }
    }()

    return out
}

func main() {
    ch := gen(1, 2, 3, 4, 5)

    sq1 := sq(ch)

    for s := range sq1 {
        fmt.Println(s)
    }
    // Output:
    // 1
    // 4
    // 9
    // 16
    // 25
}

func gen(nums …int) chan intで、各要素を、1つのchannelに入れている。
func sq(cs chan int) chan intでは、上で作ったchannnelから値が入ったら処理を実行する。

最後にmain関数でrangeで受け取った値から順番に値を出力している。

なぜこのようにするか

なぜsq関数で計算をしているのか。gen関数の中でも計算できるじゃないかとなるし
実際にも簡単に出来る (http://play.golang.org/p/ZiD7FgKvrE) 。

なのに何故こんな一つ関数を追加する必要があるのかというと
sqを複数回呼び出すことによって、平行処理を走らせることが出来る。

http
func main() {
    ch := gen(1, 2, 3, 4, 5)

    sq1 := sq(ch)
    sq2 := sq(ch)

    for s := range sq1 {
        fmt.Println(s)
    }
    fmt.Println("--")
    for s := range sq2 {
        fmt.Println(s)
    }
    // Output:(順は保証できない)
    // 1
    // 4
    // 16
    // 25
    // --
    // 9
}

この場合、sq1,sq2でそれぞれ計算が走っている。

平行で走らせた結果を集約する

ただ、上のままだとアウトプットが不安定すぎるので、merge関数を用意することになる。

http
for s := range merge(sq1, sq2) {
  fmt.Println(s)
}
http
func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            out <- n
        }
    }

    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

sync.WaitGroupは、平行処理の制御をするためのカウンターを管理している。
func (*WaitGroup) Add(delta int)でカウントアップ
func (*WaitGroup) Done() でカウントダウン
fund (*WaitGroup) Wait()はカウンターが0より上の場合はそこで止まる。

merge関数では、引数の数だけカウントアップし、
それぞれの引数の中にあるチャンネルの中身の受付が終わったら
返り値のチャンネルをcloseをしている。

キャンセルの実装

途中で処理が失敗したら、それ以降の処理を中止にしたい場合がある。
しかし、上の処理だと、sq関数を呼び出した数だけ、実行されてしまう。

http
func main() {
  ch := gen(1, 2, 3, 4, 5)

  sq1 := sq(ch)

  s := merge(sq1, sq2)

  fmt.Println(<-s)
  fmt.Println(<-s)
  // もう値をいらないよ。
}

上のように、gen関数で5つの数字を入れているのに、取り出しを2個しかしなかった場合
残りの3つの処理はキャンセルを実装したほうが良いことになる。

キャンセルをしたい場合は、各goroutineの中でその処理をスキップするように伝えなければいけなくなる。

キャンセル用の受け口を用意する

完了を知らせるためのチャンネルをひとつ用意する

done := make(chan struct{})

それを各関数に渡していく

func merge(done chan struct{}, cs ...<-chan int) <-chan int {
  var wg sync.WaitGroup
  out := make(chan int)

  output := func(c <-chan int) {
    defer wg.Done()
    for n := range c {
      select {
      case out <- n:
      case <-done:
        return
      }
    }
// …省略

doneに値が送られてきたら、wg.Done()を呼び出すことで、関数を終了させている。
doneはmain()から値を送ることになる。

func main() {
  ch := gen(1, 2, 3, 4, 5)

  sq1 := sq(ch)
  sq2 := sq(ch)

  done := make(chan struct{}, 2)
  c := merge(done, sq1, sq2)

  fmt.Println(<-c)
  fmt.Println(<-c)

  done <- struct{}{} // 必要なくなったら空のstructを送る
  done <- struct{}{}
}

ただコレだとまたチャンネルの把握をしなければいけない。
この場合はclose()関数で対応出来る。

func main() {
  ch := gen(1, 2, 3, 4, 5)

  sq1 := sq(ch)
  sq2 := sq(ch)

  done := make(chan struct{}, 2)
  defer close(done) // 最後にdoneをクローズする
  c := merge(done, sq1, sq2)

  fmt.Println(<-c)
  fmt.Println(<-c)
}

これはchannelをcloseした時に、channelの型の0値が送られるため。

このdoneをmerge以外の関数にも実装する

http
func gen(done <-chan struct{}, nums ...int) chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }()

    return out
}

func sq(done <-chan struct{}, cs <-chan int) <-chan int {
    out := make(chan int)

    go func() {
        defer close(out)
        for c := range cs {
            select {
            case out <- c * c:
            case <-done:
                return
            }
        }
    }()

    return out
}

これで処理のキャンセルをmain側から制御できるようになる。

参考

19
16
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
19
16