はじめに
Goのhttp.Request構造体のctx
はdocによると、以下の場合にcancelされます。
For outgoing client requests, the context controls cancellation.
For incoming server requests, the context is canceled when the client's connection closes, the request is canceled (with HTTP/2), or when the ServeHTTP method returns.
クライアントがタイムアウトを設定していた場合にもタイムアウト時間経過後にコネクションがcloseされるため、サーバ側実装でcontextを参照することでタイムアウトしたことも気づけることになります。
Request.ctx
やその子孫関係にあるcontext(contextは親がキャンセルされると子に伝播されます)をサーバ側で参照することで無駄なリソースの利用を防ぐことができます。
例えば、conn.ExecContext(ctx, ...)
にてsqlを実行していた場合、引数にcontextを渡すことで、sqlが実行中であってもクライアントが設定したHTTPのタイムアウトのタイミングで実行中のクエリがキャンセルされ、DBの負荷を抑えることができます。
ただ、厳密には実行中のクエリまでキャンセルされるかはdriverの実装次第かと思いますが、著名なドライバでは対応していると思って良さそうです。
postgresの一番著名なドライバであるlib/pq
にはソースでいうとこのあたりで実装されているようで、contextのキャンセルをwatchしているgoルーチンを新規に作りキャンセル判定した場合にはキャンセルのリクエストをDBサーバに送っていることがわかります。
今回確認したいこと
http.Request.ctx
を参照する有用性がわかったところで今回、確認したいのはhttp.Request構造体のctx
はいつだれがどのようにキャンセルしているかです。
僕らサーバサイドエンジニアの仕事は極論をいうと以下のインターフェースを実装することであるといっていいでしょう。
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
ポートをバインドし、Acceptし、適切にヘッダー等を読み込みといったHTTPサーバとして必要な細かい部分はよしなにGoの標準ライブラリが担ってくれるので、僕らはHandlerインターフェースの「リクエストボディを読み込んでレスポンスボディを書き出す」という仕事に集中できます。
逆にいうとServeHTTP
の実行中の処理はライブラリ側ではなく、アプリ側の実装にのみ存在するので、普通に考えれば、リクエストボディを読みこみおわったとアプリ側が判断し、それ以降コネクションがcloseされないかを監視するgoルーチンをつくる処理を自前で書かないといけないはずです。
このあたりの仕組みどうなっているんだ?とおもったのが今回の動機です。
なお、HTTP2やHTTPSは対象外とします(単純にそちらまでソースを読めていないからで同様の仕組みはあるはず)。
http
の実装
早速実装を確認していきます。せっかくですのでかなりざっくりと全体像も確認していきます。
ソースにインラインでコメントを書いていきます。元のコードを上書きは一切してませんが見通しをよくしたいためかなりの処理を消してます。
まず、大事な箇所は以下です。
https://github.com/golang/go/blob/f92337422ef2ca27464c198bb3426d2dc4661653/src/net/http/server.go#L2907-L2971
for {
// lはnet.ListenerでAcceptによりクライアントからのリクエストを待ち受けている
// 新しいリクエストがきたらrwがそのコネクションとして返る
rw, err := l.Accept()
if err != nil {
// Temporary Errorの場合は適切にsleepしcontinue
}
// cはserversideのhttp connectionを表現します
// c.rwcはrwを保持します。おそらくrwcはreadwriteconnectionの略でしょう
c := srv.newConn(rw)
c.setState(c.rwc, StateNew) // before Serve can return
go c.serve(connCtx)
}
かなり簡潔にしたのが上記です。Accept
により新しいリクエストを無限loopで待ち受け、コネクションをもとにc.serve
に新しいgoルーチンでcallしていることがわかります。アプリを実装する際、当然リクエストごとに実行するgoルーチンは異なりますがそれはここでgoルーチンを起動しているからです。
go c.serve(connCtx)
で呼ばれている処理を抜粋すると以下です。
https://github.com/golang/go/blob/f92337422ef2ca27464c198bb3426d2dc4661653/src/net/http/server.go#L1840-L1955
// Serve a new connection.
func (c *conn) serve(ctx context.Context) {
c.remoteAddr = c.rwc.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
defer func() {
// deferにてbufferリングしていたwriteをflushしたりリソースを開放します
// またc.rwc(Acceptで取得したコネクション)をcloseします
c.close()
c.setState(c.rwc, StateClosed)
}()
// TLS関連処理
if tlsConn, ok := c.rwc.(*tls.Conn); ok {
...
}
// HTTP/1.x from here on.
ctx, cancelCtx := context.WithCancel(ctx)
// このcancelCtx関数が今回テーマにしているcancelを行う関数です
c.cancelCtx = cancelCtx
defer cancelCtx()
// c.rはコネクションのreadを管理します
c.r = &connReader{conn: c}
// bufferリングしているだけ
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
for {
// request/responseを初期化し、いろいろな処理をします
// 後述
w, err := c.readRequest(ctx)
// errの処理や様々な処理
req := w.req
// readしていないリクエストボディがまだ存在する場合、EOFになった際にstartBackgroundReadするよう登録します
// req.Body.onHitEOFに登録され、req.Bodyをreadする実装において、EOFに到達したらonHitEOF関数を呼ぶようになってます。
// 存在しなければすぐにstartBackgroundReadを実行します(処理内容は後述)
if requestBodyRemains(req.Body) {
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
w.conn.r.startBackgroundRead()
}
// アプリ側が実装した処理がよばれます
serverHandler{c.server}.ServeHTTP(w, w.req)
w.cancelCtx()
// 主に以下3つの処理を行います
// - c.r.abortPendingReadをよびます(後述)
// - bufferしていたデータをwriteします
// - w.reqBody(req.Bodyと同じ).closeをよびます(全てのデータを読んでいない場合にioutil.Discardに捨てるなどしていてファイルを実際にcloseはしていない)
w.finishRequest()
// keep-alive処理
// コネクションを使いまわすかどうか判断する
// 使いまわさないのであればreturnされ、defer内の処理によりコネクションがcloseされ、使いまわせるならばfor文内のまま
// ただ、ずっと使い回すのも無駄(クライアントがすでにcloseしているかもしれない)なのでPeekでtimeoutを設定します
// 本題と関係ないですが、この実装はなるほどと感じた
if d := c.server.idleTimeout(); d != 0 {
c.rwc.SetReadDeadline(time.Now().Add(d))
if _, err := c.bufr.Peek(4); err != nil {
return
}
}
// リセットして次のリクエストの処理へ
c.rwc.SetReadDeadline(time.Time{})
}
}
c.readRequest
で呼ばれる一連の処理は以下です。
// @net/http/server.go
// Read next request from connection.
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
// 各種timeoutなどを設定します
// ...
// 後述
req, err := readRequest(c.bufr, keepHostHeader)
// req.ctxはc.cancelCtx関数をよぶとcancelされることがわかります
// もちろんw.cancelCtxでもcancelされます
ctx, cancelCtx := context.WithCancel(ctx)
req.ctx = ctx
w = &response{
conn: c,
cancelCtx: cancelCtx,
req: req,
reqBody: req.Body,
handlerHeader: make(Header),
// ...
}
w.cw.res = w
w.w = newBufioWriterSize(&w.cw, bufferBeforeChunkingSize)
return w, nil
}
// @net/http/request.go
// Headerを全て読み込みreq.Headerにつめこみます
// Bodyは一切readしていないのがポイントです
func readRequest(b *bufio.Reader, deleteHostHeader bool) (req *Request, err error) {
tp := newTextprotoReader(b)
req = new(Request)
// First line: GET /index.html HTTP/1.0
var s string
if s, err = tp.ReadLine(); err != nil {
return nil, err
}
// ...
// ...
if err != nil {
return nil, err
}
req.Header = Header(mimeHeader)
// 後述
err = readTransfer(req, b)
if err != nil {
return nil, err
}
return req, nil
}
// @net/http/transfer.go
// msg is *Request or *Response.
func readTransfer(msg interface{}, r *bufio.Reader) (err error) {
t := &transferReader{RequestMethod: "GET"}
// Unify input
isResponse := false
switch rr := msg.(type) {
case *Response:
// 省略
case *Request:
t.Header = rr.Header
t.RequestMethod = rr.Method
t.ProtoMajor = rr.ProtoMajor
t.ProtoMinor = rr.ProtoMinor
// ...
}
// Content-Lengthをもとに長さを返却します
realLength, err := fixLength(isResponse, t.StatusCode, t.RequestMethod, t.Header, t.TransferEncoding)
// Prepare body reader. ContentLength < 0 means chunked encoding
// or close connection when finished, since multipart is not supported yet
switch {
case chunked(t.TransferEncoding):
// HTTP1.1のChunked transfer encodingの場合は最後の0\r\n を読み込んだ時にEOFを返却するようなio.Readerをbody.srcに埋めます
t.Body = &body{src: internal.NewChunkedReader(r), hdr: msg, r: r, closing: t.Close}
}
case realLength > 0:
// 通常のリクエストの場合はrealLengthよみこんだらEOFを返却するio.Readerをbody.srcに埋めます
t.Body = &body{src: io.LimitReader(r, realLength), closing: t.Close}
// ...
}
// Unify output
switch rr := msg.(type) {
case *Request:
rr.Body = t.Body
rr.ContentLength = t.ContentLength
rr.TransferEncoding = t.TransferEncoding
rr.Close = t.Close
rr.Trailer = t.Trailer
case *Response:
// ...
}
return nil
}
これで全体像は把握できましたでしょうか。すぐに理解は難しい(自分も理解にかなり時間がかかりました)かもしれませんが、なんとなくイメージがわければ幸いです。
では、本題のコネクションがcloseされているかを監視する処理はどこにあるのかというと、気づいている方も多いかと思いますが、 serverHandler{c.server}.ServeHTTP(w, w.req)
の直前に実行もしくはEOF到達時のcallbackされるよう登録されているstartBackgroundRead
です。
こちらはほぼそのままソースをのせます。
func (cr *connReader) startBackgroundRead() {
cr.lock()
defer cr.unlock()
if cr.inRead {
panic("invalid concurrent Body.Read call")
}
if cr.hasByte {
return
}
// inReadはread中である場合はtrueにする必要があります
cr.inRead = true
cr.conn.rwc.SetReadDeadline(time.Time{})
// backgroundでコネクションを監視していることがわかります
// 瞬時にコネクションcloseに気づけるのはこのためだとわかります!!
go cr.backgroundRead()
}
func (cr *connReader) backgroundRead() {
n, err := cr.conn.rwc.Read(cr.byteBuf[:])
cr.lock()
// 普通はHTTPの場合レスポンスを受け取る前に新たにリクエストを送ることはプロトコル上許されないので常にn=0になるはずです
// HTTP/1.x pipeliningの場合は上記が許されているがコメントを読む限り以下のよう
// - HTTP/1.x pipeliningの場合はコネクションclose時にcontextをキャンセルしない
// - サーバがwriteしたときにエラーになり気づけるはずではある
// - サーバが全くwriteしなかったら永遠にcontextがキャンセルされないがそれはHTTP/1.x pipeliningの仕様が悪い
if n == 1 {
cr.hasByte = true
// コメント省略
// Fortunately, almost nothing uses HTTP/1.x pipelining.
// Unfortunately, apt-get does, or sometimes does.
// New Go 1.11 behavior: don't fire CloseNotify or cancel
// contexts on pipelined requests. Shouldn't affect people, but
// fixes cases like Issue 23921. This does mean that a client
// closing their TCP connection after sending a pipelined
// request won't cancel the context, but we'll catch that on any
// write failure (in checkConnErrorWriter.Write).
// If the server never writes, yes, there are still contrived
// server & client behaviors where this fails to ever cancel the
// context, but that's kinda why HTTP/1.x pipelining died
// anyway.
}
if ne, ok := err.(net.Error); ok && cr.aborted && ne.Timeout() {
// 他のgoroutineが意図的にreadをやめさせるためにabortPendingReadをcallした結果なので正常です
// Ignore this error. It's the expected error from
// another goroutine calling abortPendingRead.
} else if err != nil {
cr.handleReadError(err)
}
cr.aborted = false
cr.inRead = false
cr.unlock()
cr.cond.Broadcast()
}
// 正常時のfinishrequestもよばれます
// コネクションをreadしている処理をabortさせたいために一時的にタイムアウト判定させ、その後戻しています
func (cr *connReader) abortPendingRead() {
cr.lock()
defer cr.unlock()
if !cr.inRead {
return
}
cr.aborted = true
cr.conn.rwc.SetReadDeadline(aLongTimeAgo)
for cr.inRead {
cr.cond.Wait()
}
cr.conn.rwc.SetReadDeadline(time.Time{})
}
// handleReadError is called whenever a Read from the client returns a
// non-nil error.
//
// The provided non-nil err is almost always io.EOF or a "use of
// closed network connection". In any case, the error is not
// particularly interesting, except perhaps for debugging during
// development. Any error means the connection is dead and we should
// down its context.
//
// It may be called from multiple goroutines.
func (cr *connReader) handleReadError(_ error) {
// 既出のとおり、cr.conn.cancelCtxをよぶとreq.ctxがcancelされます
cr.conn.cancelCtx()
cr.closeNotify()
}
1点だけ補足します。backgroundRead
はコネクションをreadしようとして正常時はずっとブロックされているはずで、それはabortPendingRead
のcr.conn.rwc.SetReadDeadline(aLongTimeAgo)
で直ちにブロック解除しreturnされるようなソースの書きっぷりですが、たしかにnet.Connのinterfaceのコメントに
// SetReadDeadline sets the deadline for future Read calls
// and any currently-blocked Read call.
// A zero value for t means Read will not time out.
SetReadDeadline(t time.Time) error
書いてあり、これからReadされる処理だけでなく、現在Readでブロックされている処理にもdeadlineを設定できるようです。
ソケットをReadしてブロックしているのだからOSレベルでblockしているのでそんな制御ができるのか??
と思うかもしれませんが、よくよくソースを追っていくとGoのソケットは内部的な実装は非同期で実装されていることがわかります。
このあたりは自分もほぼソース追えてませんが、このブログやそのリンクが参考になります。
要は、公開interfaceとしては同期I/Oであるが、Goのランタイムではepoll
などを利用した非同期I/Oのようです。Goすごいですね!!
要するに
だいぶ長くなり、だいぶ脱線もしてしまったので最後にまとめます。
どのようにconnectionのcloseを検知し、Request.ctx
をキャンセルしているかの答えは以下であることがわかりました。
リクエストボディが(アプリのロジックにより)EOFまで読まれたら、コネクションをReadする処理が新しいgoroutineで起動します。このReadが意図していないエラーを返却した場合、コネクションがcloseされたことに起因しているはずであり、request.ctxをcancelする実装になっています。これによって、ServeHTTP
を実装しているアプリ側で特に意識することなく、Request.contextを元にコネクションがcloseされていることに気づけることになります。
一歩引いて、tcpにおいてこのように通信相手がcloseしたかを瞬時に把握するのは一般的に可能なのかを考えてみましょう。
tcpプロトコルとして必要な要素は「通信相手が送信し終わったことがreadした内容でわかる」ことが条件になると思います。httpの場合はcontent-lengthというheaderの値や、もしくはChunked_transfer_encodingの場合は0\r\n
で終わらせるとするという取り決めで担保されてます。同じhttpでもHTTP/1.x pipeliningの場合は終わりがわからないため無理です。
では、言語的な制約はどうでしょう。相手がcloseしたことを瞬時に検知するためには事前にread
していなければいけません。(もちろんnon blockでreadをpollingすることも可能でしょうけどやりたくはありません)
そのため、そのread
を任意のタイミングでabortさせられるサポートが必要となります。Goの場合は既出のようにランタイムの実装が実は非同期IOで簡単にabortできるので負担なく実装することができてます。
キャンセルを伝播させたくないときには
クライアントによるタイムアウト等でrequest.ctx
がキャンセルされたことによってアプリのコンテキストがキャンセルされるのが好ましくないこともあるでしょう。
その場合は、単純にcontext.Background()
等で最初のcontextを作成すればいいですが、場合によっては利用しているライブラリがRequest.ctx
を参照しているケースもあるでしょう。
その場合は、middlewareの最初の処理で、Request.Clone(context.Background())
で上書きしてあげればいいと思います。