はじめに
前回に引き続き、GoのHTTP実装と絡めて書く。今日はChannel。
経緯はこちら
Channel
Goにはunbuffered channelとbuffered channelの2種類があります。
unbuffered channelの場合、send/recvどちらかがreadyになっていない場合ブロックされることから可能な限りbuffered channelを使うことが好まれ、GoのHTTP実装でもbuffered channelが多く使われています。
が、buffered channelの場合、send/recvの後に処理を続けても実際に相手側が処理をしているとは限らないため、注意しないといけない場合があります。
それをとても簡単に示した例が以下となります。
以下では、
resc: make(chan responseAndError)が利用されており、unbuffered channelであるため、期待通り足し算の結果を受け取る(簡単のため1を足しているだけなので、1~10までの値を任意の順に受け取る)ことができます。
仮にresc:  make(chan responseAndError, 1)にて、buffered channelに変更した場合、recvでselectしている
    select {
    case <-adder.close:
        return false
    case rc := <-adder.resc:
のどちらが処理続行されるかは不定であり、約半分くらい失敗を受け取ることになります。
これはspecに以下のようにあるように処理続行可能になったものが複数ある場合はランダムに選ばれるためです。
If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed.
package main
import (
	"fmt"
	"sync"
	"time"
)
const debug = false
type responseAndError struct {
	response int
	err      error
}
type adder struct {
	// for cancel or timeout
	close chan struct{}
	resc  chan responseAndError
}
func newAdder() *adder {
	return &adder{
		close: make(chan struct{}),
		// must use unbuffered channel
		resc: make(chan responseAndError),
		// if use buffered channel, we would get FAILED log
		// resc:  make(chan responseAndError, 1),
	}
}
func (adder *adder) handle(a int, b int) bool {
	adder.add(a, b)
	time.Sleep(time.Second * 1)
	select {
	case <-adder.close:
		return false
	case rc := <-adder.resc:
		if debug {
			fmt.Printf("result: %d, err: %v", rc.response, rc.err)
		}
		return true
	}
}
func (adder *adder) add(a int, b int) {
	go func(a int, b int) {
		defer func() {
			close(adder.close)
		}()
		res := a + b
		adder.resc <- responseAndError{res, nil}
	}(a, b)
}
func main() {
	wg := &sync.WaitGroup{}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(i int) {
			adder := newAdder()
			ok := adder.handle(i, 1)
			if !ok {
				fmt.Println("================FAILED============")
			}
			wg.Done()
		}(i)
	}
	wg.Wait()
}
ソースはgithubにあげてます。
HTTP実装
上の例は実はHTTPクライアント実装を参考にしています。
具体的には、
recv側(handleメソッド)はPersistent Connectionがレスポンスを受け取るのを別goルーチンで待機する部分
send側(addメソッド)はPersistent Connectionが無限ループでreadをする処理
をエッセンスだけ取り出して書いてます。補足すると、Persistent Connectionが無限ループでreadをしている部分では、connectionが再利用されないと判断された場合は今回のサンプル同様にcloseチャネルをcloseすることで通知します。その時にコネクションも実際にcloseします。
unbuffered channelにしなくてはいけないのはコメントにも記載されており、サンプルと同じ理由です。以下引用します。rc.chは今回のサンプルのadder.rescに該当します。
			// Put the idle conn back into the pool before we send the response
			// so if they process it quickly and make another request, they'll
			// get this same conn. But we use the unbuffered channel 'rc'
			// to guarantee that persistConn.roundTrip got out of its select
			// potentially waiting for this persistConn to close.
			alive = alive &&
				!pc.sawEOF &&
				pc.wroteRequest() &&
				replaced && tryPutIdleConn(trace)
			if bodyWritable {
				closeErr = errCallerOwnsConn
			}
			select {
			case rc.ch <- responseAndError{res: resp}:
			case <-rc.callerGone:
				return
			}
			// Now that they've read from the unbuffered channel, they're safely
			// out of the select that also waits on this goroutine to die, so
			// we're allowed to exit now if needed (if alive is false)
			testHookReadLoopBeforeNextRead()
			continue
気になるところ
上記サンプルでは、 time.Sleep(time.Second * 1)を入れることでselect時には
adder.close、adder.resc両方とも処理可能である状態を作り出しましたが、仮に両方ともブロックされる状態でselectに到達した場合はどうなるんでしょう。
気になっため、time.Sleep(time.Second * 1)をres := a + b直後に移動しました。その場合、buffered channelにしてもエラーになることはありませんでした。
つまり、buffered channelにsendした時、ブロックしていたselectではbuffered channelがrecv可能と判断され選択されていたことになります。
10回ではなく10000000回実行しても同様であり、試しにGosched()を仕込ませたりしましたが結果は変わりませんでした。
unbuffered channelはsend/recv待ちでブロックされているものは対象のchannelに対してrecv/sendされれば必ずselectにて選択されます。今回のサンプルではbuffered channelも同様でした。
が、specに書いてませんし、調べた限り記載を見つけられなかったので、保証されない挙動と思っていた方がいいと思います。
いつかGoのスケジューラの実装も読むつもりなのでその時に確認しようと思いますが、どなたかご存知の方いたら教えて頂けると幸いです。