概要
- io.Pipe関数を使う際には、並行化(別goroutineでの実行)とio.PipeWriterのCloseを意識する必要がある。
- そうでなければdeadlockが起こる。
- 並行化が必要な理由は、io.PipeWriterとio.PipeReaderが内部でバッファなしのチャネルを共有しているため。
- io.PipeWriterのCloseが必要な理由は、安全にio.PipeReaderのReadメソッドを呼び出すための補助関数を使用するため。
io.Pipe関数の使い方
公式ドキュメントのExampleでは、新たにgoroutineを生成し、その中で、io.PipeWriterへの書き込み操作とCloseを行なっている。
func main() {
r, w := io.Pipe()
go func() {
fmt.Fprint(w, "some text to be read\n")
w.Close()
}()
buf := new(bytes.Buffer)
buf.ReadFrom(r)
fmt.Print(buf.String())
}
このソースコードの通りにio.Pipe関数を使用すれば問題ないのだが、今回はなぜ新たなgoroutineの中でio.PipeWriterの書き込みを行う必要があるのか、また、io.PipeWriterをCloseする必要があるのかを深堀してみる。
io.Pipe関数の仕組み
まず、io.Pipe関数のソースコードを眺めてみる
func Pipe() (*PipeReader, *PipeWriter) {
p := &pipe{
wrCh: make(chan []byte),
rdCh: make(chan int),
done: make(chan struct{}),
}
return &PipeReader{p}, &PipeWriter{p}
}
io.Pipe関数は、参照先が同じであるpipe型のメンバを共有したio.PipeReaderとio.PipeWriterを返り値とする。io.PipeReaderとio.PipeWriterはどちらも*pipe型のメンバを持つ構造体である。
// A PipeWriter is the write half of a pipe.
type PipeWriter struct {
p *pipe
}
// A PipeReader is the read half of a pipe.
type PipeReader struct {
p *pipe
}
pipe構造体は以下のメンバをもつ。
// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
type pipe struct {
wrMu sync.Mutex // Serializes Write operations
wrCh chan []byte
rdCh chan int
once sync.Once // Protects closing done
done chan struct{}
rerr atomicError
werr atomicError
}
io.PipeWriterのWriteメソッドは、byteスライス型のchannelであるpipe.wrChにデータ内容を送信する。
func (w *PipeWriter) Write(data []byte) (n int, err error) {
return w.p.Write(data)
}
func (p *pipe) Write(b []byte) (n int, err error) {
select {
case <-p.done:
return 0, p.writeCloseError()
default:
p.wrMu.Lock()
defer p.wrMu.Unlock()
}
for once := true; once || len(b) > 0; once = false {
select {
case p.wrCh <- b: // ① 書き込みデータを送信
nw := <-p.rdCh
b = b[nw:]
n += nw
case <-p.done:
return n, p.writeCloseError()
}
}
return n, nil
}
io.PipeReaderのReadメソッドは、pipe.wrChへの書き込みを受信して引数として渡されたメモリにデータ内容をコピーする。これによってストリーム処理を実現している。
func (r *PipeReader) Read(data []byte) (n int, err error) {
return r.p.Read(data)
}
func (p *pipe) Read(b []byte) (n int, err error) {
select {
case <-p.done:
return 0, p.readCloseError()
default:
}
select {
case bw := <-p.wrCh: // ② 書き込みデータを受信
nr := copy(b, bw)
p.rdCh <- nr
return nr, nil
case <-p.done:
return 0, p.readCloseError()
}
}
io.PipeWriterのWriteメソッド呼び出しを並行化する理由
io.PipeWriterのWriteメソッドにおいて、pipe.wrChにbyteスライス型のデータを送信するが、p.wrChはバッファなしのchannelだということが重要である。
バッファなしのchannelは、同期的なchannelであり、送信データを受信側が直接受け取る必要がある。そのため、送信側は受信側が値を受け取るまでブロックされる。
参考:https://golang.org/doc/effective_go.html#channels
Receivers always block until there is data to receive. If the channel is unbuffered, the sender blocks until the receiver has received the value.
p.wrChはバッファなしchannelのため、p.wrChが受信状態でなければ、送信側はブロックした状態となる。p.wrChが受信状態となるのは、io.PipeReaderのReadメソッドが実行された時である。Writeメソッドをシングルスレッドで実施した場合、送信待ちで処理がブロックし(①書き込みデータ送信のcase文に入らない)、後続のReadメソッドは永久に呼ばれず、deadlockとなってしまう。そのため、Writeメソッド呼び出しをgoroutineで並行化する必要がある。読み込み側をgoroutine化しても構わないが、その場合は、少し注意が必要である。Readメソッドが実行される前にメインルーチンが終了してしまうことがあるため、メインルーチンでReadメソッドが呼ばれるように工夫する必要がある。
io.PipeWriterをCloseする理由
以下のソースコードは、io.PipeWriterをCloseしなくても、deadlockは起きない。
func main() {
r, w := io.Pipe()
go func() {
fmt.Fprint(w, "some text to be read\n")
}()
buf := make([]byte, 1024)
r.Read(buf)
fmt.Println(string(buf))
}
しかし、次のソースコードはdeadlockが起きる。こちらは、最初の方に引用したio.Pipe関数の公式ドキュメントのExampleのソースコードから、io.PipeWriterのCloseをコメントアウトしたものだ。
func main() {
r, w := io.Pipe()
go func() {
fmt.Fprint(w, "some text to be read\n")
// w.Close()
}()
buf := new(bytes.Buffer)
buf.ReadFrom(r)
fmt.Print(buf.String())
}
io.PipeReaderを読み込む際に、bytes.Buffer型が持つReadFromメソッドは、渡されたio.Readerからデータを読み込んでバッファに追加する。このReadFromメソッドの実装を見てみる。
func (b *Buffer) ReadFrom(r io.Reader) (n int64, err error) {
b.lastRead = opInvalid
for {
i := b.grow(MinRead)
m, e := r.Read(b.buf[i:cap(b.buf)])
if m < 0 {
panic(errNegativeRead)
}
b.buf = b.buf[:i+m]
n += int64(m)
if e == io.EOF {
return n, nil // e is EOF, so return nil explicitly
}
if e != nil {
return n, e
}
}
}
ReadFromメソッドは、EOFが来るまで、forルーブでReadを呼び出し続ける。io.PipeReaderのReadがEOFを返すのは、io.PipeWriterがCloseメソッドを呼び出したあとである。実際のEOFを返している実装については割愛。
// Close closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and EOF.
func (w *PipeWriter) Close() error {
return w.CloseWithError(nil)
}
そのため、io.PipeWriterがCloseしなければ、io.PipeReaderのReadの2回目の呼び出しで、p.wrChの受信を待つため、deadlockが起きてしまう。今回は、io.PipeReaderの読み込みに、bytes.BufferのReadFromメソッドを使用したが、ioutil.ReadAll関数を使う場合も、内部でbytes.Buffer.ReadFromメソッドが呼び出されている。
func ReadAll(r io.Reader) ([]byte, error) {
return readAll(r, bytes.MinRead)
}
func readAll(r io.Reader, capacity int64) (b []byte, err error) {
var buf bytes.Buffer
// If the buffer overflows, we will get bytes.ErrTooLarge.
// Return that as an error. Any other panic remains.
defer func() {
e := recover()
if e == nil {
return
}
if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
err = panicErr
} else {
panic(e)
}
}()
if int64(int(capacity)) == capacity {
buf.Grow(int(capacity))
}
_, err = buf.ReadFrom(r)
return buf.Bytes(), err
}
また、io.Copy関数を使う場合は、引数に渡すio.Readerインタフェースを満たす型がReadFromメソッドを持つかどうかで、読み込みの挙動が変わる。そのため、io.PipeReaderを読み込む際は、io.PipeWriterのWriteメソッドを呼び出した後は、Closeメソッドを呼び出すことで、安全に処理を実施することができる。
まとめ
io.Pipe関数で生成されるio.PipeWriterとio.PipeReaderがデータの送受信に用いるchannelがバッファなしのため、どちらかの処理をgoroutineによって並行化させる必要がある。また読み込みのための関数は、EOFが返るまでReadメソッドを呼び続けるため、io.PipeWriterをCloseしてio.PipeReaderがEOFを返すようにすることによって安全な読み込みを行うことができる。
参考にさせていただいた記事
http://ascii.jp/elem/000/001/252/1252961/
http://toc-dev.blogspot.jp/2012/09/go-channel.html