Golangで作ったTCPサーバーのベンチマークを計測している時に、これまたGolangで作ったベンチマークツールから不可解なエラーがでるので調査しました。
CPU8コアで動くTCPサーバーに対して、ベンチマークツールから400コネクションを立ち上げて負荷をかけていると
// これと
write tcp 10.0.0.2:41014->10.0.0.1:5000: write: broken pipe
// これ
EOF
このエラーが、net.Conn
から時々出てくるので原因を調査しました。
TCPサーバーの基礎については GoでTCPサーバー上で独自プロトコルKey-Value Storeを実装する(知識編)を参考にしてください。
echo サーバーで試す
まず、最も簡易なTCPエコーサーバーで同じエラーが再現するかを確かめました。
echoサーバーのコード
package main
import (
"log"
"net"
)
func echo_handler(conn net.Conn) {
defer conn.Close()
// io.Copy(conn, conn)
buf := make([]byte, 1024*4)
for {
n, err := conn.Read(buf)
if err != nil {
log.Println("Read Err", err)
return
}
var nn int
nn, err = conn.Write(buf[:n])
if nn != n || err != nil {
log.Println("Write Err", err)
return
}
}
}
func main() {
psock, e := net.Listen("tcp", ":5000")
if e != nil {
log.Fatal(e)
return
}
var count int
for {
conn, e := psock.Accept()
if e != nil {
log.Fatal(e)
return
}
count++
go echo_handler(conn)
log.Println("accept:", count)
}
}
ベンチマークツールのコードは長いので一番下に記載します。
実験環境
- Cloud: Degital Ocean シンガポールリージョン
- OS: Ubuntu16.04.3
ベンチサーバー
- CPU: 16 Core
- Memory: 48GB
- Type: Standard
対象サーバー
- CPU: 8 Core
- Memory: 16GB
- Type: Standard
実験内容
ulimit は、対象サーバー、ベンチサーバー共に、10000 に設定
$ ulimit -n 10000
$ ulimit -n
10000
100コネクション
$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:29:02 Initializing...
2017/09/05 04:29:02 benchmark started
2017/09/05 04:29:32 finished writing
2017/09/05 04:29:33 Total Requests: 1552393
2017/09/05 04:29:33 Total Secounds: 30.662238499 s
2017/09/05 04:29:33 Throughput: 50628.82150794792 [#/sec]
400コネクション
$ ./bin/echo-bench -c 400 -l 30 -s 1400 10.0.0.1
2017/09/05 04:30:10 Initializing...
2017/09/05 04:30:10 benchmark started
2017/09/05 04:30:10 workerWrite write tcp 10.0.0.2:56708->10.0.0.1:5000: write: broken pipe
2017/09/05 04:30:10 workerRead EOF
2017/09/05 04:30:10 workerRead EOF
2017/09/05 04:30:10 workerWrite write tcp 10.0.0.1:56644->10.0.0.1:5000: write: broken pipe
2017/09/05 04:30:10 workerRead EOF
....
# 以下接続が切断された旨のエラーの嵐
broken pipe
のエラーが出たり出なかったりします。
100 + 100 + 100 + 100 コネクション
ベンチサーバーから同時に4本のベンチプロセスを立てて、それぞれ100コネクションずつでベンチマークをかける。
$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:32:33 Initializing...
2017/09/05 04:32:33 benchmark started
2017/09/05 04:33:03 finished writing
2017/09/05 04:33:07 Total Requests: 327683
2017/09/05 04:33:07 Total Secounds: 34.661332186 s
2017/09/05 04:33:07 Throughput: 9453.848981960187 [#/sec]
$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:32:34 Initializing...
2017/09/05 04:32:37 benchmark started
2017/09/05 04:33:07 finished writing
2017/09/05 04:33:07 Total Requests: 495080
2017/09/05 04:33:07 Total Secounds: 30.399221267 s
2017/09/05 04:33:07 Throughput: 16285.943500053934 [#/sec]
$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:32:30 Initializing...
2017/09/05 04:32:30 benchmark started
2017/09/05 04:33:00 finished writing
2017/09/05 04:33:05 Total Requests: 413615
2017/09/05 04:33:05 Total Secounds: 35.157604061 s
2017/09/05 04:33:05 Throughput: 11764.595769448897 [#/sec]
$ ./bin/echo-bench -c 100 -l 30 -s 1400 10.0.0.1
2017/09/05 04:32:31 Initializing...
2017/09/05 04:32:32 benchmark started
2017/09/05 04:33:02 finished writing
2017/09/05 04:33:06 Total Requests: 349029
2017/09/05 04:33:06 Total Secounds: 33.497632363 s
2017/09/05 04:33:06 Throughput: 10419.512526071005 [#/sec]
エラーは出ない。
気づいたこと
このように、400コネクションを同時接続した時にエラーが発生しました。
400コネクションを100コネクションずつに分けてアクセスしてもエラーは発生しません。(4枚のターミナルにそれぞれベンチスクリプトを貼り付けて順々に、ほぼ1秒以内に実行しました。)
ここである事実に気づきました。ベンチマークからは400コネクションを作成したはずなのですが、サーバー側のログを見ると200回程度しかAccept
を実行していません。
$ netstat -alnp tcp | grep ":5000" | grep ESTABLISHED | wc -l
と、サーバー側のコネクション数を計測しても、400には達していませんでした。
サーバー側でAccept
されていないのに、クライアント側では、net.Conn
が作成されコネクションが確立したことになっている?
このことを確かめるために次の実験をしました。
acceptしないTCPサーバー
サーバーのコード
package main
import (
"log"
"net"
"time"
)
func main() {
_, e := net.Listen("tcp", ":5000")
if e != nil {
log.Fatal(e)
return
}
log.Println("no accept")
time.Sleep(3600 * time.Second)
}
ListenするけどAcceptしないサーバーです。
クライアントコード
一番下に記載しているechoサーバーのベンチマークツールのfunc (w *worker) worker()
を以下のように改変しました。
var cowo int32
func (w *worker) worker() {
_, err := net.Dial("tcp", w.b.conf.laddr)
if err != nil {
panic(err)
}
r := atomic.AddInt32(&cowo, 1)
log.Println("connected", r)
}
サーバと確立したコネクションの数をログに出します。
実験して見る
クライアントコードを実行すると、サーバー側でacceptされていないにもかかわらず、クライアント側でコネクションが生成されていました。
$ netstat -alnp tcp | grep ":5000" | grep ESTABLISHED | wc -l
をして、確立済みのコネクションの数を計測すると
サーバー側で129、クライアント側で259と計測できました。
サーバー側で、コネクションの一覧を調べると
$ netstat -alnp tcp | grep ":5000"
tcp6 0 0 :::5000 :::* LISTEN 20743/echoserver
tcp6 0 0 10.0.0.1:5000 10.0.0.2:48302 ESTABLISHED -
tcp6 0 0 10.0.0.1:5000 10.0.0.2:48200 ESTABLISHED -
tcp6 0 0 10.0.0.1:5000 10.0.0.2:48168 ESTABLISHED -
tcp6 0 0 10.0.0.1:5000 10.0.0.2:48320 ESTABLISHED -
tcp6 0 0 10.0.0.1:5000 10.0.0.2:48508 SYN_RECV -
tcp6 0 0 10.0.0.1:5000 10.0.0.2:48454 SYN_RECV -
tcp6 0 0 10.0.0.1:5000 10.0.0.2:48244 ESTABLISHED -
tcp6 0 0 10.0.0.1:5000 10.0.0.2:48272 ESTABLISHED -
tcp6 0 0 10.0.0.1:5000 10.0.0.2:48182 ESTABLISHED -
tcp6 0 0 10.0.0.1:5000 10.0.0.2:48260 ESTABLISHED -
tcp6 0 0 10.0.0.1:5000 10.0.0.2:48520 SYN_RECV -
と、ESTABLISHED
に混じって、SYN_RECV
が確認できます。
backlog について
ここで調査を進めてこの記事に行き当たりました。
ESTABLISHEDになって、acceptされていないコネクションは、backlogに入ります。backlogがいっぱいになると、syn backlogに接続要求が貯まるようになっているみたいです。
結論
もともとSOMAXCONNは、128でした。
$ cat /proc/sys/net/core/somaxconn
128
backlog を増やすために、
sysctl -w net.core.somaxconn=1024
とすると最初の、broken pipe
エラーが出る事象は解決しました。
しかし、追加実験でAccept
されていないクライアント側のコネクションにWrite
してもRead
してもエラーは起こりませんでした。
なぜ、broken pipe
のエラーが出るのかは謎のままです。
echoサーバーのベンチマークツール
package main
import (
"context"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/urfave/cli"
)
func main() {
app := cli.NewApp()
app.Usage = "Benchmark for echo server"
app.Flags = requestFlag
app.Action = request
app.Run(os.Args)
}
func request(c *cli.Context) error {
conf := loadEchoConfig(c)
b := newBench(conf)
return b.request()
}
var requestFlag = []cli.Flag{
cli.IntFlag{
Name: "c",
Value: 2,
Usage: "number of clients",
},
cli.IntFlag{
Name: "port, p",
Value: 5000,
Usage: "port number",
},
cli.IntFlag{
Name: "length, l",
Value: 10,
Usage: "length(s) of request time seconds.",
},
cli.IntFlag{
Name: "size, s",
Value: 6,
Usage: "size(bytes) of binary size of 1 request.",
},
cli.BoolTFlag{
Name: "vv",
Usage: "show verbose log",
},
}
type echoConfig struct {
laddr string
workers int
l int
s int
v bool
}
func loadEchoConfig(c *cli.Context) *echoConfig {
host := "127.0.0.1"
if len(c.Args()) > 0 {
host = c.Args().First()
}
return &echoConfig{
laddr: host + ":" + strconv.Itoa(c.Int("p")),
workers: c.Int("c"),
l: c.Int("l"),
s: c.Int("s"),
v: c.Bool("vv"),
}
}
type bench struct {
conf *echoConfig
ctx context.Context
cancel context.CancelFunc
wgInit sync.WaitGroup
wgFinish sync.WaitGroup
chCount chan uint64
reqsGet [][]byte
}
func newBench(conf *echoConfig) *bench {
ctx, cancel := context.WithCancel(context.Background())
chCount := make(chan uint64)
return &bench{
conf: conf,
ctx: ctx,
cancel: cancel,
chCount: chCount,
}
}
func (b *bench) request() error {
if b.conf.v {
log.Println("Initializing...")
}
for i := 0; i < b.conf.workers; i++ {
b.wgInit.Add(1)
b.wgFinish.Add(1)
w := newWorker(b)
go w.worker()
}
b.wgInit.Wait()
start := time.Now()
if b.conf.v {
log.Println("benchmark started")
}
<-time.After(time.Duration(b.conf.l) * time.Second)
b.cancel()
if b.conf.v {
log.Println("finished writing")
}
b.wgFinish.Wait()
sub := time.Now().Sub(start).Seconds()
var count uint64
for i := 0; i < b.conf.workers; i++ {
count += <-b.chCount
}
var throughput float64
if sub > 0 {
throughput = float64(count) / sub
}
log.Printf("Total Requests: %d", count)
log.Printf("Total Secounds: %g s", sub)
log.Printf("Throughput: %g [#/sec]\n", throughput)
return nil
}
type worker struct {
b *bench
wgRequest sync.WaitGroup
ctxErr context.Context
cancelErr context.CancelFunc
ch chan struct{}
}
func newWorker(b *bench) *worker {
ctxErr, cancelErr := context.WithCancel(context.Background())
return &worker{
b: b,
ctxErr: ctxErr,
cancelErr: cancelErr,
//ch: make(chan struct{}, 10000),
}
}
func (w *worker) worker() {
conn, err := net.Dial("tcp", w.b.conf.laddr)
if err != nil {
panic(err)
}
var count uint64
defer func() {
//close(w.ch)
w.wgRequest.Wait()
w.b.wgFinish.Done()
w.b.chCount <- count
conn.Close()
}()
go w.workerRead(conn)
oBuf := make([]byte, w.b.conf.s)
// initialization finished
w.b.wgInit.Done()
w.b.wgInit.Wait()
for {
select {
case <-w.b.ctx.Done():
return
case <-w.ctxErr.Done():
// err from w.workerRead()
count = 0
return
default:
buf := oBuf
for {
var n int
//w.ch <- struct{}{}
w.wgRequest.Add(1)
n, err = conn.Write(buf)
if err != nil {
if err, ok := err.(net.Error); ok && err.Temporary() {
buf = buf[n:]
continue
}
log.Println("workerWrite", err)
w.cancelErr()
count = 0
return
}
break
}
count++
}
}
}
const matcherClose = "use of closed network connection"
func (w *worker) workerRead(conn net.Conn) {
reqSize := w.b.conf.s
buf := make([]byte, reqSize)
var nn int
for {
//<-w.ch
n, err := conn.Read(buf)
if err != nil {
if strings.Contains(err.Error(), matcherClose) {
return
}
if err, ok := err.(net.Error); !(ok && err.Temporary()) {
log.Println("workerRead", err)
w.cancelErr()
return
}
}
nn += n
for {
if nn < reqSize {
break
}
nn -= reqSize
w.wgRequest.Done()
}
}
}