Go
golang
backlog
TCP

Golangで確立したはずのTCPコネクションが確立できていなくて即エラーを起こす問題

More than 1 year has passed since last update.

Golangで作ったTCPサーバーのベンチマークを計測している時に、これまたGolangで作ったベンチマークツールから不可解なエラーがでるので調査しました。
CPU8コアで動くTCPサーバーに対して、ベンチマークツールから400コネクションを立ち上げて負荷をかけていると

// これと
write tcp 10.0.0.2:41014->10.0.0.1:5000: write: broken pipe
// これ
EOF

このエラーが、net.Connから時々出てくるので原因を調査しました。

TCPサーバーの基礎については GoでTCPサーバー上で独自プロトコルKey-Value Storeを実装する(知識編)を参考にしてください。

echo サーバーで試す

まず、最も簡易なTCPエコーサーバーで同じエラーが再現するかを確かめました。

echoサーバーのコード

main.go
package main

import (
    "log"
    "net"
)

func echo_handler(conn net.Conn) {
    defer conn.Close()
    // io.Copy(conn, conn)

    buf := make([]byte, 1024*4)
    for {
        n, err := conn.Read(buf)
        if err != nil {
            log.Println("Read Err", err)
            return
        }
        var nn int
        nn, err = conn.Write(buf[:n])
        if nn != n || err != nil {
            log.Println("Write Err", err)
            return
        }
    }
}

func main() {
    psock, e := net.Listen("tcp", ":5000")
    if e != nil {
        log.Fatal(e)
        return
    }
    var count int
    for {
        conn, e := psock.Accept()
        if e != nil {
            log.Fatal(e)
            return
        }
        count++
        go echo_handler(conn)
        log.Println("accept:", count)
    }
}

ベンチマークツールのコードは長いので一番下に記載します。

実験環境

  • Cloud: Degital Ocean シンガポールリージョン
  • OS: Ubuntu16.04.3

ベンチサーバー

  • CPU: 16 Core
  • Memory: 48GB
  • Type: Standard

対象サーバー

  • CPU: 8 Core
  • Memory: 16GB
  • Type: Standard

実験内容

ulimit は、対象サーバー、ベンチサーバー共に、10000 に設定

$ ulimit -n 10000
$ ulimit -n
10000

100コネクション

$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:29:02 Initializing...
2017/09/05 04:29:02 benchmark started
2017/09/05 04:29:32 finished writing
2017/09/05 04:29:33 Total Requests: 1552393
2017/09/05 04:29:33 Total Secounds: 30.662238499 s
2017/09/05 04:29:33 Throughput: 50628.82150794792 [#/sec]

400コネクション

$ ./bin/echo-bench -c 400 -l 30 -s 1400 10.0.0.1
2017/09/05 04:30:10 Initializing...
2017/09/05 04:30:10 benchmark started
2017/09/05 04:30:10 workerWrite write tcp 10.0.0.2:56708->10.0.0.1:5000: write: broken pipe
2017/09/05 04:30:10 workerRead EOF
2017/09/05 04:30:10 workerRead EOF
2017/09/05 04:30:10 workerWrite write tcp 10.0.0.1:56644->10.0.0.1:5000: write: broken pipe
2017/09/05 04:30:10 workerRead EOF
....
# 以下接続が切断された旨のエラーの嵐

broken pipeのエラーが出たり出なかったりします。

100 + 100 + 100 + 100 コネクション

ベンチサーバーから同時に4本のベンチプロセスを立てて、それぞれ100コネクションずつでベンチマークをかける。

$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:32:33 Initializing...
2017/09/05 04:32:33 benchmark started
2017/09/05 04:33:03 finished writing
2017/09/05 04:33:07 Total Requests: 327683
2017/09/05 04:33:07 Total Secounds: 34.661332186 s
2017/09/05 04:33:07 Throughput: 9453.848981960187 [#/sec]
$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:32:34 Initializing...
2017/09/05 04:32:37 benchmark started
2017/09/05 04:33:07 finished writing
2017/09/05 04:33:07 Total Requests: 495080
2017/09/05 04:33:07 Total Secounds: 30.399221267 s
2017/09/05 04:33:07 Throughput: 16285.943500053934 [#/sec]
$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:32:30 Initializing...
2017/09/05 04:32:30 benchmark started
2017/09/05 04:33:00 finished writing
2017/09/05 04:33:05 Total Requests: 413615
2017/09/05 04:33:05 Total Secounds: 35.157604061 s
2017/09/05 04:33:05 Throughput: 11764.595769448897 [#/sec]
$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:32:31 Initializing...
2017/09/05 04:32:32 benchmark started
2017/09/05 04:33:02 finished writing
2017/09/05 04:33:06 Total Requests: 349029
2017/09/05 04:33:06 Total Secounds: 33.497632363 s
2017/09/05 04:33:06 Throughput: 10419.512526071005 [#/sec]

エラーは出ない。

気づいたこと

このように、400コネクションを同時接続した時にエラーが発生しました。
400コネクションを100コネクションずつに分けてアクセスしてもエラーは発生しません。(4枚のターミナルにそれぞれベンチスクリプトを貼り付けて順々に、ほぼ1秒以内に実行しました。)

ここである事実に気づきました。ベンチマークからは400コネクションを作成したはずなのですが、サーバー側のログを見ると200回程度しかAcceptを実行していません。

$ netstat -alnp tcp | grep ":5000" | grep ESTABLISHED | wc -l

と、サーバー側のコネクション数を計測しても、400には達していませんでした。

サーバー側でAcceptされていないのに、クライアント側では、net.Connが作成されコネクションが確立したことになっている?
このことを確かめるために次の実験をしました。

acceptしないTCPサーバー

サーバーのコード

main.go
package main

import (
    "log"
    "net"
    "time"
)

func main() {
    _, e := net.Listen("tcp", ":5000")
    if e != nil {
        log.Fatal(e)
        return
    }
    log.Println("no accept")
    time.Sleep(3600 * time.Second)
}

ListenするけどAcceptしないサーバーです。

クライアントコード

一番下に記載しているechoサーバーのベンチマークツールのfunc (w *worker) worker() を以下のように改変しました。

main.go
var cowo int32

func (w *worker) worker() {
    _, err := net.Dial("tcp", w.b.conf.laddr)
    if err != nil {
        panic(err)
    }
    r := atomic.AddInt32(&cowo, 1)
    log.Println("connected", r)
}

サーバと確立したコネクションの数をログに出します。

実験して見る

クライアントコードを実行すると、サーバー側でacceptされていないにもかかわらず、クライアント側でコネクションが生成されていました。

$ netstat -alnp tcp | grep ":5000" | grep ESTABLISHED | wc -l

をして、確立済みのコネクションの数を計測すると

サーバー側で129、クライアント側で259と計測できました。

サーバー側で、コネクションの一覧を調べると

$ netstat -alnp tcp | grep ":5000"
tcp6       0      0 :::5000                 :::*                    LISTEN      20743/echoserver
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48302     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48200     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48168     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48320     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48508     SYN_RECV    -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48454     SYN_RECV    -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48244     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48272     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48182     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48260     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48520     SYN_RECV    -

と、ESTABLISHED に混じって、SYN_RECVが確認できます。

backlog について

ここで調査を進めてこの記事に行き当たりました。

listen backlog 【3.6】

ESTABLISHEDになって、acceptされていないコネクションは、backlogに入ります。backlogがいっぱいになると、syn backlogに接続要求が貯まるようになっているみたいです。

結論

もともとSOMAXCONNは、128でした。

$ cat /proc/sys/net/core/somaxconn
128

backlog を増やすために、

sysctl -w net.core.somaxconn=1024

とすると最初の、broken pipeエラーが出る事象は解決しました。

しかし、追加実験でAcceptされていないクライアント側のコネクションにWriteしてもReadしてもエラーは起こりませんでした。
なぜ、broken pipeのエラーが出るのかは謎のままです。

echoサーバーのベンチマークツール

main.go
package main

import (
    "context"
    "log"
    "net"
    "os"
    "strconv"
    "strings"
    "sync"
    "time"

    "github.com/urfave/cli"
)

func main() {
    app := cli.NewApp()

    app.Usage = "Benchmark for echo server"

    app.Flags = requestFlag

    app.Action = request
    app.Run(os.Args)
}

func request(c *cli.Context) error {
    conf := loadEchoConfig(c)
    b := newBench(conf)
    return b.request()
}

var requestFlag = []cli.Flag{
    cli.IntFlag{
        Name:  "c",
        Value: 2,
        Usage: "number of clients",
    },
    cli.IntFlag{
        Name:  "port, p",
        Value: 5000,
        Usage: "port number",
    },
    cli.IntFlag{
        Name:  "length, l",
        Value: 10,
        Usage: "length(s) of request time seconds.",
    },
    cli.IntFlag{
        Name:  "size, s",
        Value: 6,
        Usage: "size(bytes) of binary size of 1 request.",
    },
    cli.BoolTFlag{
        Name:  "vv",
        Usage: "show verbose log",
    },
}

type echoConfig struct {
    laddr   string
    workers int
    l       int
    s       int
    v       bool
}

func loadEchoConfig(c *cli.Context) *echoConfig {
    host := "127.0.0.1"
    if len(c.Args()) > 0 {
        host = c.Args().First()
    }
    return &echoConfig{
        laddr:   host + ":" + strconv.Itoa(c.Int("p")),
        workers: c.Int("c"),
        l:       c.Int("l"),
        s:       c.Int("s"),
        v:       c.Bool("vv"),
    }
}

type bench struct {
    conf     *echoConfig
    ctx      context.Context
    cancel   context.CancelFunc
    wgInit   sync.WaitGroup
    wgFinish sync.WaitGroup
    chCount  chan uint64
    reqsGet  [][]byte
}

func newBench(conf *echoConfig) *bench {
    ctx, cancel := context.WithCancel(context.Background())
    chCount := make(chan uint64)
    return &bench{
        conf:    conf,
        ctx:     ctx,
        cancel:  cancel,
        chCount: chCount,
    }
}

func (b *bench) request() error {
    if b.conf.v {
        log.Println("Initializing...")
    }

    for i := 0; i < b.conf.workers; i++ {
        b.wgInit.Add(1)
        b.wgFinish.Add(1)
        w := newWorker(b)
        go w.worker()
    }

    b.wgInit.Wait()
    start := time.Now()
    if b.conf.v {
        log.Println("benchmark started")
    }

    <-time.After(time.Duration(b.conf.l) * time.Second)
    b.cancel()

    if b.conf.v {
        log.Println("finished writing")
    }
    b.wgFinish.Wait()

    sub := time.Now().Sub(start).Seconds()
    var count uint64
    for i := 0; i < b.conf.workers; i++ {
        count += <-b.chCount
    }
    var throughput float64
    if sub > 0 {
        throughput = float64(count) / sub
    }
    log.Printf("Total Requests: %d", count)
    log.Printf("Total Secounds: %g s", sub)
    log.Printf("Throughput: %g [#/sec]\n", throughput)
    return nil
}

type worker struct {
    b         *bench
    wgRequest sync.WaitGroup
    ctxErr    context.Context
    cancelErr context.CancelFunc
    ch        chan struct{}
}

func newWorker(b *bench) *worker {
    ctxErr, cancelErr := context.WithCancel(context.Background())
    return &worker{
        b:         b,
        ctxErr:    ctxErr,
        cancelErr: cancelErr,
        //ch:        make(chan struct{}, 10000),
    }
}

func (w *worker) worker() {
    conn, err := net.Dial("tcp", w.b.conf.laddr)
    if err != nil {
        panic(err)
    }
    var count uint64
    defer func() {
        //close(w.ch)
        w.wgRequest.Wait()
        w.b.wgFinish.Done()

        w.b.chCount <- count
        conn.Close()
    }()
    go w.workerRead(conn)

    oBuf := make([]byte, w.b.conf.s)

    // initialization finished
    w.b.wgInit.Done()
    w.b.wgInit.Wait()

    for {
        select {
        case <-w.b.ctx.Done():
            return
        case <-w.ctxErr.Done():
            // err from w.workerRead()
            count = 0
            return
        default:
            buf := oBuf
            for {
                var n int
                //w.ch <- struct{}{}
                w.wgRequest.Add(1)
                n, err = conn.Write(buf)
                if err != nil {
                    if err, ok := err.(net.Error); ok && err.Temporary() {
                        buf = buf[n:]
                        continue
                    }
                    log.Println("workerWrite", err)
                    w.cancelErr()
                    count = 0
                    return
                }
                break
            }
            count++
        }
    }
}

const matcherClose = "use of closed network connection"

func (w *worker) workerRead(conn net.Conn) {
    reqSize := w.b.conf.s
    buf := make([]byte, reqSize)
    var nn int
    for {
        //<-w.ch
        n, err := conn.Read(buf)
        if err != nil {
            if strings.Contains(err.Error(), matcherClose) {
                return
            }
            if err, ok := err.(net.Error); !(ok && err.Temporary()) {
                log.Println("workerRead", err)
                w.cancelErr()
                return
            }
        }
        nn += n
        for {
            if nn < reqSize {
                break
            }
            nn -= reqSize
            w.wgRequest.Done()
        }
    }
}