Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
0
Help us understand the problem. What is going on with this article?
@abcb2

バッファなしチャンネル(unbeffered channel)のブロッキング(blocking)について

More than 1 year has passed since last update.

バッファなしは同期通信する

バッファなし

ch1 := make(chan int)
ch2 := make(chan int, 0)

バッファあり

ch := make(chan int, 1)

バッファなしの場合は同期通信となるのでチャンネルに送信すると同時に受信しなければずっとそこでブロックしてしまう。そのため下記のようなコードは実行前にfatal errorとなってしまう。

func main() {
    ch := make(chan int, 0)
    ch <- 1 // ここでブロックし続けてしまう
    fmt.Println("finish")
}

解決するにはどこかで受信しておかなければならない

func main() {
    ch := make(chan int, 0)
    go func() {
        <-ch
    }()
    ch <- 1
    fmt.Println("finish")
}

バッファありはcapacityまではブロックしない

下記は正常に動作しfinishが出力される

func main() {
    ch := make(chan int, 2)
    ch <- 1
    ch <- 2
    fmt.Println("finish")
}

capacityを超えるとブロックするため下記はエラーとなる

func main() {
    ch := make(chan int, 2)
    ch <- 1
    ch <- 2
    ch <- 3
    fmt.Println("finish")
}

selectの動作(バッファなしのチャネルに対して)

selectはチャンネル通信が起きた時の条件を複数登録してそれぞれの動作を記述することができるが、チャンネルのブロックには気をつけておく必要がある。

func main() {
    ch := make(chan int, 0)
    heartBeat := make(chan struct{}, 0)
    i := 0
    go func() {
        pulse := time.Tick(1000 * time.Millisecond)
        for {
            fmt.Println("==select==")
            select {
            case ch <- i:
                fmt.Println("send ch finish")
                i++
            case <-pulse:
                fmt.Println("send heartBeat start")
                heartBeat <- struct{}{}
                fmt.Println("send heartBeat finish")
            }
        }
    }()

    // mainのgoroutineでは何もしない
    for {
        select {
        }
    }
}

上のプログラムは下記の出力の後、動作しなくなる。

$ go run sample.go
==select==
send heartBeat start

heartBeatチャンネルはバッファなしのため、送信と受信が同時に起こらなければその場でブロックし続けるからだ。goroutineでのselectのloopは先に進むことなくheartBeatが受信されるまでブロックし続ける。

chへの送信は受信のロジックが書かれていないのでcaseでマッチしない。

selectの動作(バッファあり)

今度はheartBeatのチャンネルのcapacityを2にする

func main() {
    ch := make(chan int, 0)
    heartBeat := make(chan struct{}, 2)
    i := 0
    go func() {
        pulse := time.Tick(1000 * time.Millisecond)
        for {
            fmt.Println("==select==")
            select {
            case ch <- i:
                fmt.Println("send ch finish")
                i++
            case <-pulse:
                fmt.Println("send heartBeat start")
                heartBeat <- struct{}{}
                fmt.Println("send heartBeat finish")
            }
        }
    }()

    // mainのgoroutineでは何もしない
    for {
        select {
        }
    }
}

上のプログラムは次の出力の後、動作しなくなる。

$ go run sample.go
==select==
send heartBeat start
send heartBeat finish
==select==
send heartBeat start
send heartBeat finish
==select==
send heartBeat start

想定通りcapacityの数だけブロックしない。

selectループ同士のdeadlockについて

バッファなしチャンネルを使ってメインルーチンとワーカールーチンでやりとりすると、簡単にデッドロックしてしまうモノが書けてしまった。。

func SuccWorker(ctx context.Context, srcCh chan int) (chan struct{}, chan int) {
    heartBeatCh := make(chan struct{})
    succResultCh := make(chan int)
    pulse := time.NewTicker(1000 * time.Millisecond)
    go func() {
        defer close(heartBeatCh)
        defer close(succResultCh)
        defer pulse.Stop() // not close channel
        for {
            select {
            case <-ctx.Done():
                return
            case i, ok := <-srcCh:
                if !ok {
                    return
                }
                i++
                succResultCh <- i
            case <-pulse.C:
                fmt.Println("\tsend heartbeat start")
                heartBeatCh <- struct{}{}
                fmt.Println("\tsend heartbeat end")
            }
        }
    }()
    return heartBeatCh, succResultCh
}

func main() {
    fmt.Println("Start")
    ctx := context.Background()
    srcCh := make(chan int)
    heartBeatCh, succResultCh := SuccWorker(ctx, srcCh)

    // kick
    go func() {
        <-time.After(3 * time.Second)
        srcCh <- 0
    }()

mainloop:
    for {
        select {
        case <-heartBeatCh:
            fmt.Println("HeartBeat: ", time.Now().Format("15:04:05"))
        case i, ok := <-succResultCh:
            if !ok {
                break mainloop
            }
            <-time.After(1000 * time.Millisecond)
            fmt.Println("succResult: ", i)
            fmt.Println("send srcCh start")
            srcCh <- i
            fmt.Println("send srcCh end")
        }
    }
    fmt.Println("Finish")
}

上記を動かすと以下のような出力が出て以降は止まってしまう。

Start
    send heartbeat start
    send heartbeat end
HeartBeat:  17:39:16
    send heartbeat start
    send heartbeat end
HeartBeat:  17:39:17
    send heartbeat start
    send heartbeat end
HeartBeat:  17:39:18
    send heartbeat start
succResult:  1
send srcCh start

ワーカーからはheartbeatを送信しようとしたが、mainloopではワーカーにsrcChを介して送信しようとしており、両方とも相手の受信待ちでブロックが発生してしまう。

deadlockの解消

解消方法は自分が調べた範囲では下記2つが有効だった。

  • selectループをなるべく止めない
  • バッファありチャネルを使う

selectループを止めない方法

mainloopの中でsuccResultChから受信してからの処理をgoroutineで実行してすぐにselectループに戻るようにして解消した。ように見えた。

func SuccWorker(ctx context.Context, srcCh chan int) (chan struct{}, chan int) {
    heartBeatCh := make(chan struct{}, 0)
    succResultCh := make(chan int, 0)
    pulse := time.NewTicker(1000 * time.Millisecond)
    go func() {
        defer close(heartBeatCh)
        defer close(succResultCh)
        defer pulse.Stop()
        for {
            select {
            case <-ctx.Done():
                return
            case i, ok := <-srcCh:
                if !ok {
                    return
                }
                i++
                succResultCh <- i
            case <-pulse.C:
                fmt.Println("\tsend heartbeat start")
                heartBeatCh <- struct{}{}
                fmt.Println("\tsend heartbeat end")
            }
        }
    }()
    return heartBeatCh, succResultCh
}

func main() {
    fmt.Println("Start")
    ctx := context.Background()
    srcCh := make(chan int, 0)
    heartBeatCh, succResultCh := SuccWorker(ctx, srcCh)

    // kick
    go func() {
        <-time.After(3 * time.Second)
        srcCh <- 0
    }()

mainloop:
    for {
        select {
        case <-heartBeatCh:
            fmt.Println("HeartBeat: ", time.Now().Format("15:04:05"))
        case i, ok := <-succResultCh:
            if !ok {
                break mainloop
            }
            go func(){
                <-time.After(100 * time.Millisecond)
                fmt.Println("succResult: ", i)
                fmt.Println("send srcCh start")
                srcCh <- i
                fmt.Println("send srcCh end")
            }()
        }
    }
    fmt.Println("Finish")
}

バッファありチャネルを使う方法

使っているチャンネルを全てcapacity 1のバッファありに変更するとひとまずうまく動いているように見えた。こちらのmainLoopのselectループの中ではgoroutineでの処理はしていない。

func SuccWorker(ctx context.Context, srcCh chan int) (chan struct{}, chan int) {
    heartBeatCh := make(chan struct{}, 1)
    succResultCh := make(chan int, 1)
    pulse := time.NewTicker(1000 * time.Millisecond)
    go func() {
        defer close(heartBeatCh)
        defer close(succResultCh)
        defer pulse.Stop()
        for {
            select {
            case <-ctx.Done():
                return
            case i, ok := <-srcCh:
                if !ok {
                    return
                }
                i++
                succResultCh <- i
            case <-pulse.C:
                fmt.Println("\tsend heartbeat start")
                heartBeatCh <- struct{}{}
                fmt.Println("\tsend heartbeat end")
            }
        }
    }()
    return heartBeatCh, succResultCh
}

func main() {
    fmt.Println("Start")
    ctx := context.Background()
    srcCh := make(chan int, 1)
    heartBeatCh, succResultCh := SuccWorker(ctx, srcCh)

    // kick
    go func() {
        <-time.After(3 * time.Second)
        srcCh <- 0
    }()

mainloop:
    for {
        select {
        case <-heartBeatCh:
            fmt.Println("HeartBeat: ", time.Now().Format("15:04:05"))
        case i, ok := <-succResultCh:
            if !ok {
                break mainloop
            }
            <-time.After(100 * time.Millisecond)
            fmt.Println("succResult: ", i)
            fmt.Println("send srcCh start")
            srcCh <- i
            fmt.Println("send srcCh end")
        }
    }
    fmt.Println("Finish")
}

最後に

selectループ同士で通信する時はなるべくselectループを止めないのと、基本的にはバッファありチャンネルを利用する、ことで全てではないだろうがデッドロックの可能性を少なくすることができるように見えた。

0
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
abcb2

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
0
Help us understand the problem. What is going on with this article?