LoginSignup
22
15

More than 3 years have passed since last update.

Goのhttp.Requestのキャンセルの仕組みを理解する

Last updated at Posted at 2020-08-04

はじめに

Goのhttp.Request構造体のctxdocによると、以下の場合にcancelされます。

For outgoing client requests, the context controls cancellation.
For incoming server requests, the context is canceled when the client's connection closes, the request is canceled (with HTTP/2), or when the ServeHTTP method returns.

クライアントがタイムアウトを設定していた場合にもタイムアウト時間経過後にコネクションがcloseされるため、サーバ側実装でcontextを参照することでタイムアウトしたことも気づけることになります。

Request.ctxやその子孫関係にあるcontext(contextは親がキャンセルされると子に伝播されます)をサーバ側で参照することで無駄なリソースの利用を防ぐことができます。
例えば、conn.ExecContext(ctx, ...)にてsqlを実行していた場合、引数にcontextを渡すことで、sqlが実行中であってもクライアントが設定したHTTPのタイムアウトのタイミングで実行中のクエリがキャンセルされ、DBの負荷を抑えることができます。

ただ、厳密には実行中のクエリまでキャンセルされるかはdriverの実装次第かと思いますが、著名なドライバでは対応していると思って良さそうです。

postgresの一番著名なドライバであるlib/pqにはソースでいうとこのあたりで実装されているようで、contextのキャンセルをwatchしているgoルーチンを新規に作りキャンセル判定した場合にはキャンセルのリクエストをDBサーバに送っていることがわかります。

今回確認したいこと

http.Request.ctxを参照する有用性がわかったところで今回、確認したいのはhttp.Request構造体のctxはいつだれがどのようにキャンセルしているかです。

僕らサーバサイドエンジニアの仕事は極論をいうと以下のインターフェースを実装することであるといっていいでしょう。


type Handler interface {
    ServeHTTP(ResponseWriter, *Request)
}

ポートをバインドし、Acceptし、適切にヘッダー等を読み込みといったHTTPサーバとして必要な細かい部分はよしなにGoの標準ライブラリが担ってくれるので、僕らはHandlerインターフェースの「リクエストボディを読み込んでレスポンスボディを書き出す」という仕事に集中できます。

逆にいうとServeHTTPの実行中の処理はライブラリ側ではなく、アプリ側の実装にのみ存在するので、普通に考えれば、リクエストボディを読みこみおわったとアプリ側が判断し、それ以降コネクションがcloseされないかを監視するgoルーチンをつくる処理を自前で書かないといけないはずです。

このあたりの仕組みどうなっているんだ?とおもったのが今回の動機です。

なお、HTTP2やHTTPSは対象外とします(単純にそちらまでソースを読めていないからで同様の仕組みはあるはず)。

httpの実装

早速実装を確認していきます。せっかくですのでかなりざっくりと全体像も確認していきます。

ソースにインラインでコメントを書いていきます。元のコードを上書きは一切してませんが見通しをよくしたいためかなりの処理を消してます。

まず、大事な箇所は以下です。
https://github.com/golang/go/blob/f92337422ef2ca27464c198bb3426d2dc4661653/src/net/http/server.go#L2907-L2971


for {
   // lはnet.ListenerでAcceptによりクライアントからのリクエストを待ち受けている
   // 新しいリクエストがきたらrwがそのコネクションとして返る
   rw, err := l.Accept() 
   if err != nil {
       // Temporary Errorの場合は適切にsleepしcontinue
   }
   // cはserversideのhttp connectionを表現します
   // c.rwcはrwを保持します。おそらくrwcはreadwriteconnectionの略でしょう
   c := srv.newConn(rw)
   c.setState(c.rwc, StateNew) // before Serve can return
   go c.serve(connCtx)
}

かなり簡潔にしたのが上記です。Acceptにより新しいリクエストを無限loopで待ち受け、コネクションをもとにc.serveに新しいgoルーチンでcallしていることがわかります。アプリを実装する際、当然リクエストごとに実行するgoルーチンは異なりますがそれはここでgoルーチンを起動しているからです。

go c.serve(connCtx)で呼ばれている処理を抜粋すると以下です。
https://github.com/golang/go/blob/f92337422ef2ca27464c198bb3426d2dc4661653/src/net/http/server.go#L1840-L1955

// Serve a new connection.
func (c *conn) serve(ctx context.Context) {
    c.remoteAddr = c.rwc.RemoteAddr().String()
    ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
    defer func() {
        // deferにてbufferリングしていたwriteをflushしたりリソースを開放します
        // またc.rwc(Acceptで取得したコネクション)をcloseします
        c.close()
        c.setState(c.rwc, StateClosed)
    }()

    // TLS関連処理
    if tlsConn, ok := c.rwc.(*tls.Conn); ok {
        ...
    }

    // HTTP/1.x from here on.

    ctx, cancelCtx := context.WithCancel(ctx)
    // このcancelCtx関数が今回テーマにしているcancelを行う関数です
    c.cancelCtx = cancelCtx
    defer cancelCtx()

    // c.rはコネクションのreadを管理します
    c.r = &connReader{conn: c}
    // bufferリングしているだけ
    c.bufr = newBufioReader(c.r)
    c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)

    for {
        // request/responseを初期化し、いろいろな処理をします
        // 後述
        w, err := c.readRequest(ctx)

        // errの処理や様々な処理
        req := w.req
        // readしていないリクエストボディがまだ存在する場合、EOFになった際にstartBackgroundReadするよう登録します
        // req.Body.onHitEOFに登録され、req.Bodyをreadする実装において、EOFに到達したらonHitEOF関数を呼ぶようになってます。
        // 存在しなければすぐにstartBackgroundReadを実行します(処理内容は後述)
        if requestBodyRemains(req.Body) {
            registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
        } else {
            w.conn.r.startBackgroundRead()
        }
        // アプリ側が実装した処理がよばれます
        serverHandler{c.server}.ServeHTTP(w, w.req)
        w.cancelCtx()
        // 主に以下3つの処理を行います
        // - c.r.abortPendingReadをよびます(後述)
        // - bufferしていたデータをwriteします
        // - w.reqBody(req.Bodyと同じ).closeをよびます(全てのデータを読んでいない場合にioutil.Discardに捨てるなどしていてファイルを実際にcloseはしていない)
        w.finishRequest()

        // keep-alive処理
        // コネクションを使いまわすかどうか判断する
        // 使いまわさないのであればreturnされ、defer内の処理によりコネクションがcloseされ、使いまわせるならばfor文内のまま
        // ただ、ずっと使い回すのも無駄(クライアントがすでにcloseしているかもしれない)なのでPeekでtimeoutを設定します
        // 本題と関係ないですが、この実装はなるほどと感じた
        if d := c.server.idleTimeout(); d != 0 {
            c.rwc.SetReadDeadline(time.Now().Add(d))
            if _, err := c.bufr.Peek(4); err != nil {
                return
            }
        }
        // リセットして次のリクエストの処理へ
        c.rwc.SetReadDeadline(time.Time{})
    }
}

c.readRequestで呼ばれる一連の処理は以下です。

// @net/http/server.go
// Read next request from connection.
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
    // 各種timeoutなどを設定します
    // ...

    // 後述
    req, err := readRequest(c.bufr, keepHostHeader)

    // req.ctxはc.cancelCtx関数をよぶとcancelされることがわかります
    // もちろんw.cancelCtxでもcancelされます
    ctx, cancelCtx := context.WithCancel(ctx)
    req.ctx = ctx

    w = &response{
        conn:          c,
        cancelCtx:     cancelCtx,
        req:           req,
        reqBody:       req.Body,
        handlerHeader: make(Header),
        // ...
    }
    w.cw.res = w
    w.w = newBufioWriterSize(&w.cw, bufferBeforeChunkingSize)
    return w, nil
}


// @net/http/request.go
// Headerを全て読み込みreq.Headerにつめこみます
// Bodyは一切readしていないのがポイントです
func readRequest(b *bufio.Reader, deleteHostHeader bool) (req *Request, err error) {
    tp := newTextprotoReader(b)
    req = new(Request)

    // First line: GET /index.html HTTP/1.0
    var s string
    if s, err = tp.ReadLine(); err != nil {
        return nil, err
    }
    // ...
    // ...
    if err != nil {
        return nil, err
    }
    req.Header = Header(mimeHeader)

    // 後述
    err = readTransfer(req, b)
    if err != nil {
        return nil, err
    }
    return req, nil
}

// @net/http/transfer.go
// msg is *Request or *Response.
func readTransfer(msg interface{}, r *bufio.Reader) (err error) {
    t := &transferReader{RequestMethod: "GET"}

    // Unify input
    isResponse := false
    switch rr := msg.(type) {
    case *Response:
        // 省略
    case *Request:
        t.Header = rr.Header
        t.RequestMethod = rr.Method
        t.ProtoMajor = rr.ProtoMajor
        t.ProtoMinor = rr.ProtoMinor
        // ...
    }
    // Content-Lengthをもとに長さを返却します
    realLength, err := fixLength(isResponse, t.StatusCode, t.RequestMethod, t.Header, t.TransferEncoding)
    // Prepare body reader. ContentLength < 0 means chunked encoding
    // or close connection when finished, since multipart is not supported yet
    switch {
    case chunked(t.TransferEncoding):
        // HTTP1.1のChunked transfer encodingの場合は最後の0\r\n を読み込んだ時にEOFを返却するようなio.Readerをbody.srcに埋めます
        t.Body = &body{src: internal.NewChunkedReader(r), hdr: msg, r: r, closing: t.Close}
    }
    case realLength > 0:
        // 通常のリクエストの場合はrealLengthよみこんだらEOFを返却するio.Readerをbody.srcに埋めます    
        t.Body = &body{src: io.LimitReader(r, realLength), closing: t.Close}
    // ...
    }

    // Unify output
    switch rr := msg.(type) {
    case *Request:
        rr.Body = t.Body
        rr.ContentLength = t.ContentLength
        rr.TransferEncoding = t.TransferEncoding
        rr.Close = t.Close
        rr.Trailer = t.Trailer
    case *Response:
        // ...
    }

    return nil
}

これで全体像は把握できましたでしょうか。すぐに理解は難しい(自分も理解にかなり時間がかかりました)かもしれませんが、なんとなくイメージがわければ幸いです。

では、本題のコネクションがcloseされているかを監視する処理はどこにあるのかというと、気づいている方も多いかと思いますが、 serverHandler{c.server}.ServeHTTP(w, w.req)の直前に実行もしくはEOF到達時のcallbackされるよう登録されているstartBackgroundReadです。

こちらはほぼそのままソースをのせます。


func (cr *connReader) startBackgroundRead() {
    cr.lock()
    defer cr.unlock()
    if cr.inRead {
        panic("invalid concurrent Body.Read call")
    }
    if cr.hasByte {
        return
    }
    // inReadはread中である場合はtrueにする必要があります
    cr.inRead = true
    cr.conn.rwc.SetReadDeadline(time.Time{})
    // backgroundでコネクションを監視していることがわかります
    // 瞬時にコネクションcloseに気づけるのはこのためだとわかります!!
    go cr.backgroundRead()
}

func (cr *connReader) backgroundRead() {
    n, err := cr.conn.rwc.Read(cr.byteBuf[:])
    cr.lock()
    // 普通はHTTPの場合レスポンスを受け取る前に新たにリクエストを送ることはプロトコル上許されないので常にn=0になるはずです
    // HTTP/1.x pipeliningの場合は上記が許されているがコメントを読む限り以下のよう
    // - HTTP/1.x pipeliningの場合はコネクションclose時にcontextをキャンセルしない
    // - サーバがwriteしたときにエラーになり気づけるはずではある
    // - サーバが全くwriteしなかったら永遠にcontextがキャンセルされないがそれはHTTP/1.x pipeliningの仕様が悪い
    if n == 1 {
        cr.hasByte = true
        // コメント省略        
        // Fortunately, almost nothing uses HTTP/1.x pipelining.
        // Unfortunately, apt-get does, or sometimes does.
        // New Go 1.11 behavior: don't fire CloseNotify or cancel
        // contexts on pipelined requests. Shouldn't affect people, but
        // fixes cases like Issue 23921. This does mean that a client
        // closing their TCP connection after sending a pipelined
        // request won't cancel the context, but we'll catch that on any
        // write failure (in checkConnErrorWriter.Write).
        // If the server never writes, yes, there are still contrived
        // server & client behaviors where this fails to ever cancel the
        // context, but that's kinda why HTTP/1.x pipelining died
        // anyway.
    }
    if ne, ok := err.(net.Error); ok && cr.aborted && ne.Timeout() {
        // 他のgoroutineが意図的にreadをやめさせるためにabortPendingReadをcallした結果なので正常です
        // Ignore this error. It's the expected error from
        // another goroutine calling abortPendingRead.
    } else if err != nil {
        cr.handleReadError(err)
    }
    cr.aborted = false
    cr.inRead = false
    cr.unlock()
    cr.cond.Broadcast()
}

// 正常時のfinishrequestもよばれます
// コネクションをreadしている処理をabortさせたいために一時的にタイムアウト判定させ、その後戻しています
func (cr *connReader) abortPendingRead() {
    cr.lock()
    defer cr.unlock()
    if !cr.inRead {
        return
    }
    cr.aborted = true
    cr.conn.rwc.SetReadDeadline(aLongTimeAgo)
    for cr.inRead {
        cr.cond.Wait()
    }
    cr.conn.rwc.SetReadDeadline(time.Time{})
}

// handleReadError is called whenever a Read from the client returns a
// non-nil error.
//
// The provided non-nil err is almost always io.EOF or a "use of
// closed network connection". In any case, the error is not
// particularly interesting, except perhaps for debugging during
// development. Any error means the connection is dead and we should
// down its context.
//
// It may be called from multiple goroutines.
func (cr *connReader) handleReadError(_ error) {
    // 既出のとおり、cr.conn.cancelCtxをよぶとreq.ctxがcancelされます
    cr.conn.cancelCtx()
    cr.closeNotify()
}

1点だけ補足します。backgroundReadはコネクションをreadしようとして正常時はずっとブロックされているはずで、それはabortPendingReadcr.conn.rwc.SetReadDeadline(aLongTimeAgo)で直ちにブロック解除しreturnされるようなソースの書きっぷりですが、たしかにnet.Connのinterfaceのコメントに


    // SetReadDeadline sets the deadline for future Read calls
    // and any currently-blocked Read call.
    // A zero value for t means Read will not time out.
    SetReadDeadline(t time.Time) error

書いてあり、これからReadされる処理だけでなく、現在Readでブロックされている処理にもdeadlineを設定できるようです。
ソケットをReadしてブロックしているのだからOSレベルでblockしているのでそんな制御ができるのか??
と思うかもしれませんが、よくよくソースを追っていくとGoのソケットは内部的な実装は非同期で実装されていることがわかります。

このあたりは自分もほぼソース追えてませんが、このブログやそのリンクが参考になります。

要は、公開interfaceとしては同期I/Oであるが、Goのランタイムではepollなどを利用した非同期I/Oのようです。Goすごいですね!!

要するに

だいぶ長くなり、だいぶ脱線もしてしまったので最後にまとめます。

どのようにconnectionのcloseを検知し、Request.ctxをキャンセルしているかの答えは以下であることがわかりました。

リクエストボディが(アプリのロジックにより)EOFまで読まれたら、コネクションをReadする処理が新しいgoroutineで起動します。このReadが意図していないエラーを返却した場合、コネクションがcloseされたことに起因しているはずであり、request.ctxをcancelする実装になっています。これによって、ServeHTTPを実装しているアプリ側で特に意識することなく、Request.contextを元にコネクションがcloseされていることに気づけることになります。

一歩引いて、tcpにおいてこのように通信相手がcloseしたかを瞬時に把握するのは一般的に可能なのかを考えてみましょう。

tcpプロトコルとして必要な要素は「通信相手が送信し終わったことがreadした内容でわかる」ことが条件になると思います。httpの場合はcontent-lengthというheaderの値や、もしくはChunked_transfer_encodingの場合は0\r\nで終わらせるとするという取り決めで担保されてます。同じhttpでもHTTP/1.x pipeliningの場合は終わりがわからないため無理です。

では、言語的な制約はどうでしょう。相手がcloseしたことを瞬時に検知するためには事前にreadしていなければいけません。(もちろんnon blockでreadをpollingすることも可能でしょうけどやりたくはありません)
そのため、そのreadを任意のタイミングでabortさせられるサポートが必要となります。Goの場合は既出のようにランタイムの実装が実は非同期IOで簡単にabortできるので負担なく実装することができてます。

キャンセルを伝播させたくないときには

クライアントによるタイムアウト等でrequest.ctxがキャンセルされたことによってアプリのコンテキストがキャンセルされるのが好ましくないこともあるでしょう。

その場合は、単純にcontext.Background()等で最初のcontextを作成すればいいですが、場合によっては利用しているライブラリRequest.ctxを参照しているケースもあるでしょう。

その場合は、middlewareの最初の処理で、Request.Clone(context.Background())で上書きしてあげればいいと思います。

22
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
22
15