5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

バッファなしチャンネル(unbeffered channel)のブロッキング(blocking)について

Posted at

バッファなしは同期通信する

バッファなし

ch1 := make(chan int)
ch2 := make(chan int, 0)

バッファあり

ch := make(chan int, 1)

バッファなしの場合は同期通信となるのでチャンネルに送信すると同時に受信しなければずっとそこでブロックしてしまう。そのため下記のようなコードは実行前にfatal errorとなってしまう。

func main() {
	ch := make(chan int, 0)
	ch <- 1 // ここでブロックし続けてしまう
	fmt.Println("finish")
}

解決するにはどこかで受信しておかなければならない

func main() {
	ch := make(chan int, 0)
	go func() {
		<-ch
	}()
	ch <- 1
	fmt.Println("finish")
}

バッファありはcapacityまではブロックしない

下記は正常に動作しfinishが出力される

func main() {
	ch := make(chan int, 2)
	ch <- 1
	ch <- 2
	fmt.Println("finish")
}

capacityを超えるとブロックするため下記はエラーとなる

func main() {
	ch := make(chan int, 2)
	ch <- 1
	ch <- 2
	ch <- 3
	fmt.Println("finish")
}

selectの動作(バッファなしのチャネルに対して)

selectはチャンネル通信が起きた時の条件を複数登録してそれぞれの動作を記述することができるが、チャンネルのブロックには気をつけておく必要がある。

func main() {
	ch := make(chan int, 0)
	heartBeat := make(chan struct{}, 0)
	i := 0
	go func() {
		pulse := time.Tick(1000 * time.Millisecond)
		for {
			fmt.Println("==select==")
			select {
			case ch <- i:
				fmt.Println("send ch finish")
				i++
			case <-pulse:
				fmt.Println("send heartBeat start")
				heartBeat <- struct{}{}
				fmt.Println("send heartBeat finish")
			}
		}
	}()

	// mainのgoroutineでは何もしない
	for {
		select {
		}
	}
}

上のプログラムは下記の出力の後、動作しなくなる。

$ go run sample.go
==select==
send heartBeat start

heartBeatチャンネルはバッファなしのため、送信と受信が同時に起こらなければその場でブロックし続けるからだ。goroutineでのselectのloopは先に進むことなくheartBeatが受信されるまでブロックし続ける。

chへの送信は受信のロジックが書かれていないのでcaseでマッチしない。

selectの動作(バッファあり)

今度はheartBeatのチャンネルのcapacityを2にする

func main() {
	ch := make(chan int, 0)
	heartBeat := make(chan struct{}, 2)
	i := 0
	go func() {
		pulse := time.Tick(1000 * time.Millisecond)
		for {
			fmt.Println("==select==")
			select {
			case ch <- i:
				fmt.Println("send ch finish")
				i++
			case <-pulse:
				fmt.Println("send heartBeat start")
				heartBeat <- struct{}{}
				fmt.Println("send heartBeat finish")
			}
		}
	}()

	// mainのgoroutineでは何もしない
	for {
		select {
		}
	}
}

上のプログラムは次の出力の後、動作しなくなる。

$ go run sample.go
==select==
send heartBeat start
send heartBeat finish
==select==
send heartBeat start
send heartBeat finish
==select==
send heartBeat start

想定通りcapacityの数だけブロックしない。

selectループ同士のdeadlockについて

バッファなしチャンネルを使ってメインルーチンとワーカールーチンでやりとりすると、簡単にデッドロックしてしまうモノが書けてしまった。。

func SuccWorker(ctx context.Context, srcCh chan int) (chan struct{}, chan int) {
	heartBeatCh := make(chan struct{})
	succResultCh := make(chan int)
	pulse := time.NewTicker(1000 * time.Millisecond)
	go func() {
		defer close(heartBeatCh)
		defer close(succResultCh)
		defer pulse.Stop() // not close channel
		for {
			select {
			case <-ctx.Done():
				return
			case i, ok := <-srcCh:
				if !ok {
					return
				}
				i++
				succResultCh <- i
			case <-pulse.C:
				fmt.Println("\tsend heartbeat start")
				heartBeatCh <- struct{}{}
				fmt.Println("\tsend heartbeat end")
			}
		}
	}()
	return heartBeatCh, succResultCh
}

func main() {
	fmt.Println("Start")
	ctx := context.Background()
	srcCh := make(chan int)
	heartBeatCh, succResultCh := SuccWorker(ctx, srcCh)

	// kick
	go func() {
		<-time.After(3 * time.Second)
		srcCh <- 0
	}()

mainloop:
	for {
		select {
		case <-heartBeatCh:
			fmt.Println("HeartBeat: ", time.Now().Format("15:04:05"))
		case i, ok := <-succResultCh:
			if !ok {
				break mainloop
			}
			<-time.After(1000 * time.Millisecond)
			fmt.Println("succResult: ", i)
			fmt.Println("send srcCh start")
			srcCh <- i
			fmt.Println("send srcCh end")
		}
	}
	fmt.Println("Finish")
}

上記を動かすと以下のような出力が出て以降は止まってしまう。

Start
	send heartbeat start
	send heartbeat end
HeartBeat:  17:39:16
	send heartbeat start
	send heartbeat end
HeartBeat:  17:39:17
	send heartbeat start
	send heartbeat end
HeartBeat:  17:39:18
	send heartbeat start
succResult:  1
send srcCh start

ワーカーからはheartbeatを送信しようとしたが、mainloopではワーカーにsrcChを介して送信しようとしており、両方とも相手の受信待ちでブロックが発生してしまう。

deadlockの解消

解消方法は自分が調べた範囲では下記2つが有効だった。

  • selectループをなるべく止めない
  • バッファありチャネルを使う

selectループを止めない方法

mainloopの中でsuccResultChから受信してからの処理をgoroutineで実行してすぐにselectループに戻るようにして解消した。ように見えた。

func SuccWorker(ctx context.Context, srcCh chan int) (chan struct{}, chan int) {
	heartBeatCh := make(chan struct{}, 0)
	succResultCh := make(chan int, 0)
	pulse := time.NewTicker(1000 * time.Millisecond)
	go func() {
		defer close(heartBeatCh)
		defer close(succResultCh)
		defer pulse.Stop()
		for {
			select {
			case <-ctx.Done():
				return
			case i, ok := <-srcCh:
				if !ok {
					return
				}
				i++
				succResultCh <- i
			case <-pulse.C:
				fmt.Println("\tsend heartbeat start")
				heartBeatCh <- struct{}{}
				fmt.Println("\tsend heartbeat end")
			}
		}
	}()
	return heartBeatCh, succResultCh
}

func main() {
	fmt.Println("Start")
	ctx := context.Background()
	srcCh := make(chan int, 0)
	heartBeatCh, succResultCh := SuccWorker(ctx, srcCh)

	// kick
	go func() {
		<-time.After(3 * time.Second)
		srcCh <- 0
	}()

mainloop:
	for {
		select {
		case <-heartBeatCh:
			fmt.Println("HeartBeat: ", time.Now().Format("15:04:05"))
		case i, ok := <-succResultCh:
			if !ok {
				break mainloop
			}
			go func(){
				<-time.After(100 * time.Millisecond)
				fmt.Println("succResult: ", i)
				fmt.Println("send srcCh start")
				srcCh <- i
				fmt.Println("send srcCh end")
			}()
		}
	}
	fmt.Println("Finish")
}

バッファありチャネルを使う方法

使っているチャンネルを全てcapacity 1のバッファありに変更するとひとまずうまく動いているように見えた。こちらのmainLoopのselectループの中ではgoroutineでの処理はしていない。

func SuccWorker(ctx context.Context, srcCh chan int) (chan struct{}, chan int) {
	heartBeatCh := make(chan struct{}, 1)
	succResultCh := make(chan int, 1)
	pulse := time.NewTicker(1000 * time.Millisecond)
	go func() {
		defer close(heartBeatCh)
		defer close(succResultCh)
		defer pulse.Stop()
		for {
			select {
			case <-ctx.Done():
				return
			case i, ok := <-srcCh:
				if !ok {
					return
				}
				i++
				succResultCh <- i
			case <-pulse.C:
				fmt.Println("\tsend heartbeat start")
				heartBeatCh <- struct{}{}
				fmt.Println("\tsend heartbeat end")
			}
		}
	}()
	return heartBeatCh, succResultCh
}

func main() {
	fmt.Println("Start")
	ctx := context.Background()
	srcCh := make(chan int, 1)
	heartBeatCh, succResultCh := SuccWorker(ctx, srcCh)

	// kick
	go func() {
		<-time.After(3 * time.Second)
		srcCh <- 0
	}()

mainloop:
	for {
		select {
		case <-heartBeatCh:
			fmt.Println("HeartBeat: ", time.Now().Format("15:04:05"))
		case i, ok := <-succResultCh:
			if !ok {
				break mainloop
			}
			<-time.After(100 * time.Millisecond)
			fmt.Println("succResult: ", i)
			fmt.Println("send srcCh start")
			srcCh <- i
			fmt.Println("send srcCh end")
		}
	}
	fmt.Println("Finish")
}

最後に

selectループ同士で通信する時はなるべくselectループを止めないのと、基本的にはバッファありチャンネルを利用する、ことで全てではないだろうがデッドロックの可能性を少なくすることができるように見えた。

5
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?