バッファなしは同期通信する
バッファなし
ch1 := make(chan int)
ch2 := make(chan int, 0)
バッファあり
ch := make(chan int, 1)
バッファなしの場合は同期通信となるのでチャンネルに送信すると同時に受信しなければずっとそこでブロックしてしまう。そのため下記のようなコードは実行前にfatal errorとなってしまう。
func main() {
ch := make(chan int, 0)
ch <- 1 // ここでブロックし続けてしまう
fmt.Println("finish")
}
解決するにはどこかで受信しておかなければならない
func main() {
ch := make(chan int, 0)
go func() {
<-ch
}()
ch <- 1
fmt.Println("finish")
}
バッファありはcapacityまではブロックしない
下記は正常に動作しfinishが出力される
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
fmt.Println("finish")
}
capacityを超えるとブロックするため下記はエラーとなる
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
ch <- 3
fmt.Println("finish")
}
selectの動作(バッファなしのチャネルに対して)
selectはチャンネル通信が起きた時の条件を複数登録してそれぞれの動作を記述することができるが、チャンネルのブロックには気をつけておく必要がある。
func main() {
ch := make(chan int, 0)
heartBeat := make(chan struct{}, 0)
i := 0
go func() {
pulse := time.Tick(1000 * time.Millisecond)
for {
fmt.Println("==select==")
select {
case ch <- i:
fmt.Println("send ch finish")
i++
case <-pulse:
fmt.Println("send heartBeat start")
heartBeat <- struct{}{}
fmt.Println("send heartBeat finish")
}
}
}()
// mainのgoroutineでは何もしない
for {
select {
}
}
}
上のプログラムは下記の出力の後、動作しなくなる。
$ go run sample.go
==select==
send heartBeat start
heartBeatチャンネルはバッファなしのため、送信と受信が同時に起こらなければその場でブロックし続けるからだ。goroutineでのselectのloopは先に進むことなくheartBeatが受信されるまでブロックし続ける。
chへの送信は受信のロジックが書かれていないのでcaseでマッチしない。
selectの動作(バッファあり)
今度はheartBeatのチャンネルのcapacityを2にする
func main() {
ch := make(chan int, 0)
heartBeat := make(chan struct{}, 2)
i := 0
go func() {
pulse := time.Tick(1000 * time.Millisecond)
for {
fmt.Println("==select==")
select {
case ch <- i:
fmt.Println("send ch finish")
i++
case <-pulse:
fmt.Println("send heartBeat start")
heartBeat <- struct{}{}
fmt.Println("send heartBeat finish")
}
}
}()
// mainのgoroutineでは何もしない
for {
select {
}
}
}
上のプログラムは次の出力の後、動作しなくなる。
$ go run sample.go
==select==
send heartBeat start
send heartBeat finish
==select==
send heartBeat start
send heartBeat finish
==select==
send heartBeat start
想定通りcapacityの数だけブロックしない。
selectループ同士のdeadlockについて
バッファなしチャンネルを使ってメインルーチンとワーカールーチンでやりとりすると、簡単にデッドロックしてしまうモノが書けてしまった。。
func SuccWorker(ctx context.Context, srcCh chan int) (chan struct{}, chan int) {
heartBeatCh := make(chan struct{})
succResultCh := make(chan int)
pulse := time.NewTicker(1000 * time.Millisecond)
go func() {
defer close(heartBeatCh)
defer close(succResultCh)
defer pulse.Stop() // not close channel
for {
select {
case <-ctx.Done():
return
case i, ok := <-srcCh:
if !ok {
return
}
i++
succResultCh <- i
case <-pulse.C:
fmt.Println("\tsend heartbeat start")
heartBeatCh <- struct{}{}
fmt.Println("\tsend heartbeat end")
}
}
}()
return heartBeatCh, succResultCh
}
func main() {
fmt.Println("Start")
ctx := context.Background()
srcCh := make(chan int)
heartBeatCh, succResultCh := SuccWorker(ctx, srcCh)
// kick
go func() {
<-time.After(3 * time.Second)
srcCh <- 0
}()
mainloop:
for {
select {
case <-heartBeatCh:
fmt.Println("HeartBeat: ", time.Now().Format("15:04:05"))
case i, ok := <-succResultCh:
if !ok {
break mainloop
}
<-time.After(1000 * time.Millisecond)
fmt.Println("succResult: ", i)
fmt.Println("send srcCh start")
srcCh <- i
fmt.Println("send srcCh end")
}
}
fmt.Println("Finish")
}
上記を動かすと以下のような出力が出て以降は止まってしまう。
Start
send heartbeat start
send heartbeat end
HeartBeat: 17:39:16
send heartbeat start
send heartbeat end
HeartBeat: 17:39:17
send heartbeat start
send heartbeat end
HeartBeat: 17:39:18
send heartbeat start
succResult: 1
send srcCh start
ワーカーからはheartbeatを送信しようとしたが、mainloopではワーカーにsrcChを介して送信しようとしており、両方とも相手の受信待ちでブロックが発生してしまう。
deadlockの解消
解消方法は自分が調べた範囲では下記2つが有効だった。
- selectループをなるべく止めない
- バッファありチャネルを使う
selectループを止めない方法
mainloopの中でsuccResultChから受信してからの処理をgoroutineで実行してすぐにselectループに戻るようにして解消した。ように見えた。
func SuccWorker(ctx context.Context, srcCh chan int) (chan struct{}, chan int) {
heartBeatCh := make(chan struct{}, 0)
succResultCh := make(chan int, 0)
pulse := time.NewTicker(1000 * time.Millisecond)
go func() {
defer close(heartBeatCh)
defer close(succResultCh)
defer pulse.Stop()
for {
select {
case <-ctx.Done():
return
case i, ok := <-srcCh:
if !ok {
return
}
i++
succResultCh <- i
case <-pulse.C:
fmt.Println("\tsend heartbeat start")
heartBeatCh <- struct{}{}
fmt.Println("\tsend heartbeat end")
}
}
}()
return heartBeatCh, succResultCh
}
func main() {
fmt.Println("Start")
ctx := context.Background()
srcCh := make(chan int, 0)
heartBeatCh, succResultCh := SuccWorker(ctx, srcCh)
// kick
go func() {
<-time.After(3 * time.Second)
srcCh <- 0
}()
mainloop:
for {
select {
case <-heartBeatCh:
fmt.Println("HeartBeat: ", time.Now().Format("15:04:05"))
case i, ok := <-succResultCh:
if !ok {
break mainloop
}
go func(){
<-time.After(100 * time.Millisecond)
fmt.Println("succResult: ", i)
fmt.Println("send srcCh start")
srcCh <- i
fmt.Println("send srcCh end")
}()
}
}
fmt.Println("Finish")
}
バッファありチャネルを使う方法
使っているチャンネルを全てcapacity 1のバッファありに変更するとひとまずうまく動いているように見えた。こちらのmainLoopのselectループの中ではgoroutineでの処理はしていない。
func SuccWorker(ctx context.Context, srcCh chan int) (chan struct{}, chan int) {
heartBeatCh := make(chan struct{}, 1)
succResultCh := make(chan int, 1)
pulse := time.NewTicker(1000 * time.Millisecond)
go func() {
defer close(heartBeatCh)
defer close(succResultCh)
defer pulse.Stop()
for {
select {
case <-ctx.Done():
return
case i, ok := <-srcCh:
if !ok {
return
}
i++
succResultCh <- i
case <-pulse.C:
fmt.Println("\tsend heartbeat start")
heartBeatCh <- struct{}{}
fmt.Println("\tsend heartbeat end")
}
}
}()
return heartBeatCh, succResultCh
}
func main() {
fmt.Println("Start")
ctx := context.Background()
srcCh := make(chan int, 1)
heartBeatCh, succResultCh := SuccWorker(ctx, srcCh)
// kick
go func() {
<-time.After(3 * time.Second)
srcCh <- 0
}()
mainloop:
for {
select {
case <-heartBeatCh:
fmt.Println("HeartBeat: ", time.Now().Format("15:04:05"))
case i, ok := <-succResultCh:
if !ok {
break mainloop
}
<-time.After(100 * time.Millisecond)
fmt.Println("succResult: ", i)
fmt.Println("send srcCh start")
srcCh <- i
fmt.Println("send srcCh end")
}
}
fmt.Println("Finish")
}
最後に
selectループ同士で通信する時はなるべくselectループを止めないのと、基本的にはバッファありチャンネルを利用する、ことで全てではないだろうがデッドロックの可能性を少なくすることができるように見えた。