LoginSignup
13
7

More than 5 years have passed since last update.

Golangで確立したはずのTCPコネクションが確立できていなくて即エラーを起こす問題

Posted at

Golangで作ったTCPサーバーのベンチマークを計測している時に、これまたGolangで作ったベンチマークツールから不可解なエラーがでるので調査しました。
CPU8コアで動くTCPサーバーに対して、ベンチマークツールから400コネクションを立ち上げて負荷をかけていると

// これと
write tcp 10.0.0.2:41014->10.0.0.1:5000: write: broken pipe
// これ
EOF

このエラーが、net.Connから時々出てくるので原因を調査しました。

TCPサーバーの基礎については GoでTCPサーバー上で独自プロトコルKey-Value Storeを実装する(知識編)を参考にしてください。

echo サーバーで試す

まず、最も簡易なTCPエコーサーバーで同じエラーが再現するかを確かめました。

echoサーバーのコード

main.go
package main

import (
    "log"
    "net"
)

func echo_handler(conn net.Conn) {
    defer conn.Close()
    // io.Copy(conn, conn)

    buf := make([]byte, 1024*4)
    for {
        n, err := conn.Read(buf)
        if err != nil {
            log.Println("Read Err", err)
            return
        }
        var nn int
        nn, err = conn.Write(buf[:n])
        if nn != n || err != nil {
            log.Println("Write Err", err)
            return
        }
    }
}

func main() {
    psock, e := net.Listen("tcp", ":5000")
    if e != nil {
        log.Fatal(e)
        return
    }
    var count int
    for {
        conn, e := psock.Accept()
        if e != nil {
            log.Fatal(e)
            return
        }
        count++
        go echo_handler(conn)
        log.Println("accept:", count)
    }
}

ベンチマークツールのコードは長いので一番下に記載します。

実験環境

  • Cloud: Degital Ocean シンガポールリージョン
  • OS: Ubuntu16.04.3

ベンチサーバー

  • CPU: 16 Core
  • Memory: 48GB
  • Type: Standard

対象サーバー

  • CPU: 8 Core
  • Memory: 16GB
  • Type: Standard

実験内容

ulimit は、対象サーバー、ベンチサーバー共に、10000 に設定

$ ulimit -n 10000
$ ulimit -n
10000

100コネクション

$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:29:02 Initializing...
2017/09/05 04:29:02 benchmark started
2017/09/05 04:29:32 finished writing
2017/09/05 04:29:33 Total Requests: 1552393
2017/09/05 04:29:33 Total Secounds: 30.662238499 s
2017/09/05 04:29:33 Throughput: 50628.82150794792 [#/sec]

400コネクション

$ ./bin/echo-bench -c 400 -l 30 -s 1400 10.0.0.1
2017/09/05 04:30:10 Initializing...
2017/09/05 04:30:10 benchmark started
2017/09/05 04:30:10 workerWrite write tcp 10.0.0.2:56708->10.0.0.1:5000: write: broken pipe
2017/09/05 04:30:10 workerRead EOF
2017/09/05 04:30:10 workerRead EOF
2017/09/05 04:30:10 workerWrite write tcp 10.0.0.1:56644->10.0.0.1:5000: write: broken pipe
2017/09/05 04:30:10 workerRead EOF
....
# 以下接続が切断された旨のエラーの嵐

broken pipeのエラーが出たり出なかったりします。

100 + 100 + 100 + 100 コネクション

ベンチサーバーから同時に4本のベンチプロセスを立てて、それぞれ100コネクションずつでベンチマークをかける。

$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:32:33 Initializing...
2017/09/05 04:32:33 benchmark started
2017/09/05 04:33:03 finished writing
2017/09/05 04:33:07 Total Requests: 327683
2017/09/05 04:33:07 Total Secounds: 34.661332186 s
2017/09/05 04:33:07 Throughput: 9453.848981960187 [#/sec]
$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:32:34 Initializing...
2017/09/05 04:32:37 benchmark started
2017/09/05 04:33:07 finished writing
2017/09/05 04:33:07 Total Requests: 495080
2017/09/05 04:33:07 Total Secounds: 30.399221267 s
2017/09/05 04:33:07 Throughput: 16285.943500053934 [#/sec]
$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:32:30 Initializing...
2017/09/05 04:32:30 benchmark started
2017/09/05 04:33:00 finished writing
2017/09/05 04:33:05 Total Requests: 413615
2017/09/05 04:33:05 Total Secounds: 35.157604061 s
2017/09/05 04:33:05 Throughput: 11764.595769448897 [#/sec]
$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:32:31 Initializing...
2017/09/05 04:32:32 benchmark started
2017/09/05 04:33:02 finished writing
2017/09/05 04:33:06 Total Requests: 349029
2017/09/05 04:33:06 Total Secounds: 33.497632363 s
2017/09/05 04:33:06 Throughput: 10419.512526071005 [#/sec]

エラーは出ない。

気づいたこと

このように、400コネクションを同時接続した時にエラーが発生しました。
400コネクションを100コネクションずつに分けてアクセスしてもエラーは発生しません。(4枚のターミナルにそれぞれベンチスクリプトを貼り付けて順々に、ほぼ1秒以内に実行しました。)

ここである事実に気づきました。ベンチマークからは400コネクションを作成したはずなのですが、サーバー側のログを見ると200回程度しかAcceptを実行していません。

$ netstat -alnp tcp | grep ":5000" | grep ESTABLISHED | wc -l

と、サーバー側のコネクション数を計測しても、400には達していませんでした。

サーバー側でAcceptされていないのに、クライアント側では、net.Connが作成されコネクションが確立したことになっている?
このことを確かめるために次の実験をしました。

acceptしないTCPサーバー

サーバーのコード

main.go
package main

import (
    "log"
    "net"
    "time"
)

func main() {
    _, e := net.Listen("tcp", ":5000")
    if e != nil {
        log.Fatal(e)
        return
    }
    log.Println("no accept")
    time.Sleep(3600 * time.Second)
}

ListenするけどAcceptしないサーバーです。

クライアントコード

一番下に記載しているechoサーバーのベンチマークツールのfunc (w *worker) worker() を以下のように改変しました。

main.go
var cowo int32

func (w *worker) worker() {
    _, err := net.Dial("tcp", w.b.conf.laddr)
    if err != nil {
        panic(err)
    }
    r := atomic.AddInt32(&cowo, 1)
    log.Println("connected", r)
}

サーバと確立したコネクションの数をログに出します。

実験して見る

クライアントコードを実行すると、サーバー側でacceptされていないにもかかわらず、クライアント側でコネクションが生成されていました。

$ netstat -alnp tcp | grep ":5000" | grep ESTABLISHED | wc -l

をして、確立済みのコネクションの数を計測すると

サーバー側で129、クライアント側で259と計測できました。

サーバー側で、コネクションの一覧を調べると

$ netstat -alnp tcp | grep ":5000"
tcp6       0      0 :::5000                 :::*                    LISTEN      20743/echoserver
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48302     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48200     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48168     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48320     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48508     SYN_RECV    -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48454     SYN_RECV    -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48244     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48272     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48182     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48260     ESTABLISHED -
tcp6       0      0 10.0.0.1:5000      10.0.0.2:48520     SYN_RECV    -

と、ESTABLISHED に混じって、SYN_RECVが確認できます。

backlog について

ここで調査を進めてこの記事に行き当たりました。

listen backlog 【3.6】

ESTABLISHEDになって、acceptされていないコネクションは、backlogに入ります。backlogがいっぱいになると、syn backlogに接続要求が貯まるようになっているみたいです。

結論

もともとSOMAXCONNは、128でした。

$ cat /proc/sys/net/core/somaxconn
128

backlog を増やすために、

sysctl -w net.core.somaxconn=1024

とすると最初の、broken pipeエラーが出る事象は解決しました。

しかし、追加実験でAcceptされていないクライアント側のコネクションにWriteしてもReadしてもエラーは起こりませんでした。
なぜ、broken pipeのエラーが出るのかは謎のままです。

echoサーバーのベンチマークツール

main.go
package main

import (
    "context"
    "log"
    "net"
    "os"
    "strconv"
    "strings"
    "sync"
    "time"

    "github.com/urfave/cli"
)

func main() {
    app := cli.NewApp()

    app.Usage = "Benchmark for echo server"

    app.Flags = requestFlag

    app.Action = request
    app.Run(os.Args)
}

func request(c *cli.Context) error {
    conf := loadEchoConfig(c)
    b := newBench(conf)
    return b.request()
}

var requestFlag = []cli.Flag{
    cli.IntFlag{
        Name:  "c",
        Value: 2,
        Usage: "number of clients",
    },
    cli.IntFlag{
        Name:  "port, p",
        Value: 5000,
        Usage: "port number",
    },
    cli.IntFlag{
        Name:  "length, l",
        Value: 10,
        Usage: "length(s) of request time seconds.",
    },
    cli.IntFlag{
        Name:  "size, s",
        Value: 6,
        Usage: "size(bytes) of binary size of 1 request.",
    },
    cli.BoolTFlag{
        Name:  "vv",
        Usage: "show verbose log",
    },
}

type echoConfig struct {
    laddr   string
    workers int
    l       int
    s       int
    v       bool
}

func loadEchoConfig(c *cli.Context) *echoConfig {
    host := "127.0.0.1"
    if len(c.Args()) > 0 {
        host = c.Args().First()
    }
    return &echoConfig{
        laddr:   host + ":" + strconv.Itoa(c.Int("p")),
        workers: c.Int("c"),
        l:       c.Int("l"),
        s:       c.Int("s"),
        v:       c.Bool("vv"),
    }
}

type bench struct {
    conf     *echoConfig
    ctx      context.Context
    cancel   context.CancelFunc
    wgInit   sync.WaitGroup
    wgFinish sync.WaitGroup
    chCount  chan uint64
    reqsGet  [][]byte
}

func newBench(conf *echoConfig) *bench {
    ctx, cancel := context.WithCancel(context.Background())
    chCount := make(chan uint64)
    return &bench{
        conf:    conf,
        ctx:     ctx,
        cancel:  cancel,
        chCount: chCount,
    }
}

func (b *bench) request() error {
    if b.conf.v {
        log.Println("Initializing...")
    }

    for i := 0; i < b.conf.workers; i++ {
        b.wgInit.Add(1)
        b.wgFinish.Add(1)
        w := newWorker(b)
        go w.worker()
    }

    b.wgInit.Wait()
    start := time.Now()
    if b.conf.v {
        log.Println("benchmark started")
    }

    <-time.After(time.Duration(b.conf.l) * time.Second)
    b.cancel()

    if b.conf.v {
        log.Println("finished writing")
    }
    b.wgFinish.Wait()

    sub := time.Now().Sub(start).Seconds()
    var count uint64
    for i := 0; i < b.conf.workers; i++ {
        count += <-b.chCount
    }
    var throughput float64
    if sub > 0 {
        throughput = float64(count) / sub
    }
    log.Printf("Total Requests: %d", count)
    log.Printf("Total Secounds: %g s", sub)
    log.Printf("Throughput: %g [#/sec]\n", throughput)
    return nil
}

type worker struct {
    b         *bench
    wgRequest sync.WaitGroup
    ctxErr    context.Context
    cancelErr context.CancelFunc
    ch        chan struct{}
}

func newWorker(b *bench) *worker {
    ctxErr, cancelErr := context.WithCancel(context.Background())
    return &worker{
        b:         b,
        ctxErr:    ctxErr,
        cancelErr: cancelErr,
        //ch:        make(chan struct{}, 10000),
    }
}

func (w *worker) worker() {
    conn, err := net.Dial("tcp", w.b.conf.laddr)
    if err != nil {
        panic(err)
    }
    var count uint64
    defer func() {
        //close(w.ch)
        w.wgRequest.Wait()
        w.b.wgFinish.Done()

        w.b.chCount <- count
        conn.Close()
    }()
    go w.workerRead(conn)

    oBuf := make([]byte, w.b.conf.s)

    // initialization finished
    w.b.wgInit.Done()
    w.b.wgInit.Wait()

    for {
        select {
        case <-w.b.ctx.Done():
            return
        case <-w.ctxErr.Done():
            // err from w.workerRead()
            count = 0
            return
        default:
            buf := oBuf
            for {
                var n int
                //w.ch <- struct{}{}
                w.wgRequest.Add(1)
                n, err = conn.Write(buf)
                if err != nil {
                    if err, ok := err.(net.Error); ok && err.Temporary() {
                        buf = buf[n:]
                        continue
                    }
                    log.Println("workerWrite", err)
                    w.cancelErr()
                    count = 0
                    return
                }
                break
            }
            count++
        }
    }
}

const matcherClose = "use of closed network connection"

func (w *worker) workerRead(conn net.Conn) {
    reqSize := w.b.conf.s
    buf := make([]byte, reqSize)
    var nn int
    for {
        //<-w.ch
        n, err := conn.Read(buf)
        if err != nil {
            if strings.Contains(err.Error(), matcherClose) {
                return
            }
            if err, ok := err.(net.Error); !(ok && err.Temporary()) {
                log.Println("workerRead", err)
                w.cancelErr()
                return
            }
        }
        nn += n
        for {
            if nn < reqSize {
                break
            }
            nn -= reqSize
            w.wgRequest.Done()
        }
    }
}

13
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
7