Go Concurrency Patterns: Pipelines and cancellationを読んだときのメモ。
Pipelineパターン
例えば、ある複数の数字をすべて2倍にしたい場合。
package main
import "fmt"
func gen(nums ...int) chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func sq(cs <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for c := range cs {
out <- c * c
}
}()
return out
}
func main() {
ch := gen(1, 2, 3, 4, 5)
sq1 := sq(ch)
for s := range sq1 {
fmt.Println(s)
}
// Output:
// 1
// 4
// 9
// 16
// 25
}
func gen(nums …int) chan int
で、各要素を、1つのchannelに入れている。
func sq(cs chan int) chan int
では、上で作ったchannnelから値が入ったら処理を実行する。
最後にmain関数でrange
で受け取った値から順番に値を出力している。
なぜこのようにするか
なぜsq関数で計算をしているのか。gen関数の中でも計算できるじゃないかとなるし
実際にも簡単に出来る (http://play.golang.org/p/ZiD7FgKvrE) 。
なのに何故こんな一つ関数を追加する必要があるのかというと
sqを複数回呼び出すことによって、平行処理を走らせることが出来る。
func main() {
ch := gen(1, 2, 3, 4, 5)
sq1 := sq(ch)
sq2 := sq(ch)
for s := range sq1 {
fmt.Println(s)
}
fmt.Println("--")
for s := range sq2 {
fmt.Println(s)
}
// Output:(順は保証できない)
// 1
// 4
// 16
// 25
// --
// 9
}
この場合、sq1,sq2でそれぞれ計算が走っている。
平行で走らせた結果を集約する
ただ、上のままだとアウトプットが不安定すぎるので、merge
関数を用意することになる。
for s := range merge(sq1, sq2) {
fmt.Println(s)
}
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
sync.WaitGroup
は、平行処理の制御をするためのカウンターを管理している。
func (*WaitGroup) Add(delta int)
でカウントアップ
func (*WaitGroup) Done()
でカウントダウン
fund (*WaitGroup) Wait()
はカウンターが0より上の場合はそこで止まる。
merge関数では、引数の数だけカウントアップし、
それぞれの引数の中にあるチャンネルの中身の受付が終わったら
返り値のチャンネルをcloseをしている。
キャンセルの実装
途中で処理が失敗したら、それ以降の処理を中止にしたい場合がある。
しかし、上の処理だと、sq
関数を呼び出した数だけ、実行されてしまう。
func main() {
ch := gen(1, 2, 3, 4, 5)
sq1 := sq(ch)
s := merge(sq1, sq2)
fmt.Println(<-s)
fmt.Println(<-s)
// もう値をいらないよ。
}
上のように、gen関数で5つの数字を入れているのに、取り出しを2個しかしなかった場合
残りの3つの処理はキャンセルを実装したほうが良いことになる。
キャンセルをしたい場合は、各goroutineの中でその処理をスキップするように伝えなければいけなくなる。
キャンセル用の受け口を用意する
完了を知らせるためのチャンネルをひとつ用意する
done := make(chan struct{})
それを各関数に渡していく
func merge(done chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
// …省略
doneに値が送られてきたら、wg.Done()を呼び出すことで、関数を終了させている。
doneはmain()から値を送ることになる。
func main() {
ch := gen(1, 2, 3, 4, 5)
sq1 := sq(ch)
sq2 := sq(ch)
done := make(chan struct{}, 2)
c := merge(done, sq1, sq2)
fmt.Println(<-c)
fmt.Println(<-c)
done <- struct{}{} // 必要なくなったら空のstructを送る
done <- struct{}{}
}
ただコレだとまたチャンネルの把握をしなければいけない。
この場合はclose()
関数で対応出来る。
func main() {
ch := gen(1, 2, 3, 4, 5)
sq1 := sq(ch)
sq2 := sq(ch)
done := make(chan struct{}, 2)
defer close(done) // 最後にdoneをクローズする
c := merge(done, sq1, sq2)
fmt.Println(<-c)
fmt.Println(<-c)
}
これはchannelをcloseした時に、channelの型の0値が送られるため。
このdoneをmerge以外の関数にも実装する
func gen(done <-chan struct{}, nums ...int) chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done:
return
}
}
}()
return out
}
func sq(done <-chan struct{}, cs <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for c := range cs {
select {
case out <- c * c:
case <-done:
return
}
}
}()
return out
}
これで処理のキャンセルをmain側から制御できるようになる。