4
1

More than 5 years have passed since last update.

Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その2)

Last updated at Posted at 2017-08-22

6月からDMM.comラボの六本木オフィスでミドルウェアを作るエンジニアインターンをしている@kawasin73です。
DMM.comラボではluaで実装されたKVS(キーバリューストア)を利用しています。
これは、TCPの上で独自プロトコルで通信しており、URIのPathがKeyとなり最長共通接頭辞検索をするKVSで、社内でluaの皮を被ったC言語で実装されたものが運用されています。
この度、このKVSをGo言語で再実装することになり、設計は既存のミドルウェアを踏襲した形で DMM.com ラボの方が行い、実装は僕がすることになりました。
Go言語の実装手法(goroutine や channel等)については僕が学びながらそれについて都度相談するというスタイルで行なっています。
その開発記を連載しています。

第1回 GoでTCPサーバー上で独自プロトコルKey-Value Storeを実装する(知識編)
第2回 GolangでハイパフォーマンスなTCPサーバーを実装する(下準備編)
第3回 Go言語でTCPのEchoサーバーを実直に実装する
第4回 Go言語でシグナルハンドリングをするTCPのEchoサーバーを実装する
第5回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その1)
第6回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その2)
第7回 GolangでTCPサーバーに再起動とGraceful Shutdownを実装する

第6回の今回は、Writeの処理を非同期化します。

はじめに

開発環境は以下の通りです。

  • OS: macOS 10.12.5
  • Go version: 1.8.3
  • IDE: Gogland

今回開発した内容は、こちらのリポジトリtcp4ディレクトリで公開しています。

Writeを非同期化する

前回までの実装では、for文の中でReadしてその後にWriteする直列的な処理を行っていました。
Writeしている間は、Readできませんし、Readしている間はWriteできません。
そこで、Writeを別のgoroutineで行うことでWriteReadを並行して実行できるようにします。

Writeを非同期化するには2種類の方法があります。

  • 別goroutineでfor文でWriteを繰り返し行い、handleReadからバッファ付きchannelで読み込んだバイト列を渡して書き込む方法
     +------------+    +-------------+
     | handleRead |    | handleWrite |
     +-----+------+    +------+------+
           |                  |
     +----->                  |
     |     |                  |
     |   Read                 <----+
Loop |     |      channel     |    |
     |     +------------------>    |
     |     |                  |    | Loop
     +-----+                Write  |
                              |    |
                              +----+
  • handleReadからReadする度に、goroutineを起動しWriteする方法
+------------+
| handleRead |
+-----+------+
      |
      |
      |
    Read
      |
      |   goroutine   +------------+
      +---------------> handleEcho |
      |               +-----+------+
      |                     |
    Read                  Write
      |
      |   goroutine   +------------+
      +---------------> handleEcho |
      |               +-----+------+
      |                     |
      |                   Write
      |

今回は後者の「handleReadからReadする度に、goroutineを起動しWriteする方法」を採用します。
理由としては以下の通りになります。

今回は最終的にデータベースを開発する第一歩としてechoサーバーを作っています。
そのため将来的には、Writeをする前にはデータベースに関する処理を行うことになります。
データベースの処理はReadWriteより遅くなることが予想されます。
データベースの処理を多重化して高速化することを目論んでいるので、echoサーバーの実装のうちからWriteするgoroutineを多重化します。

net.TCPConnWriteの内部実装を見ると、func (fd *netFD) Write(p []byte)の中でロックを取得してからWriteしています。
そのため、複数のgoroutineから同時にWriteされても競合することなくWriteすることができます。

ただ後者はデメリットもあります。
channelを使うとReadされた順番の通りにWriteされますが、複数のgoroutineを起動する方法だとgoroutineの実行順序を保証することができません。
echoサーバーとしてはWriteされるものの順序が変わる可能性があるため失格です。
しかし、この後開発するデータベースのプロトコルでは、リクエストの順序とレスポンスの順序が一致しないことを前提として、RID(リクエストID)をリクエストとレスポンスのバイナリに含めることになっています。
そのため、今回はレスポンスの順序が変わる可能性があることを許容して実装します。

実装

conn.go
func (c *Conn) handleRead() {
    defer c.stopRead()

    buf := make([]byte, 4*1024)

    for {
        n, err := c.conn.Read(buf)
        if err != nil {
            if ne, ok := err.(net.Error); ok {
                switch {
                case ne.Temporary():
                    continue
                }
            }
            log.Println("Read", err)
            return
        }

        go c.handleEcho(buf[:n]) // <- 非同期化
    }
}

func (c *Conn) handleEcho(buf []byte) {
    // do something
    // 将来的にはここでデータベースに関する処理を行う

    // write
    for {
        n, err := c.conn.Write(buf)
        if err != nil {
            if nerr, ok := err.(net.Error); ok {
                if nerr.Temporary() {
                    buf = buf[n:]
                    continue
                }
            }
            log.Println("Write error", err)
            // write error
            c.stopRead()
        }
        return
    }
}

前回の実装で、errReadとしていたキャンセルメソッドは、わかりやすくするためにstopReadと名前を変えました。

Slice の内部に思いを馳せる

ここまでの状態で(コミットID: 33dcf05b194348b38c6ee668ece08c6dadb7be72)、一度動作確認をしてみます。

0 ~ 255 まで、1バイトずつ高速に送信します。(コードは、tcp4/cmd/sample/main.go)

func main() {
    c, err := net.Dial("tcp", "127.0.0.1:12345")
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    go write(c)

    buf := make([]byte, 100)
    for i := 0; i < 256; i++ {
        var n int
        n, err = c.Read(buf)
        fmt.Println(buf[:n])
    }
}

func write(c net.Conn) {
    for i := 0; i < 256; i++ {
        c.Write([]byte{byte(i)})
    }
}

結果は、この通りです。

[0 1 2 3 4 5 6 7 8 9 10 11 12 13]
[15 15] // <- 重複
[16 17 18 19 20 21 22 23]
[25] // <- 重複
[25] // <- 重複
[26]
~~~ 省略 ~~~
[255 251]
[255] // <- 重複
[255] // <- 重複
[255] // <- 重複

レスポンスが重複しているところが多くあります。なぜでしょうか?

ここで、Slice の内部実装に思いを馳せてみます。

これらの記事を読むと分かる通り、あるSliceから作成されたSliceは、内部では同じ配列のポインタを保持しています。
WriteReadが並行に実行され、Writeする前にbufが上書きされているために、レスポンスが正しく送られなかったのだとわかります。

ここまでを踏まえると

func (c *Conn) handleRead() {
    defer c.stopRead()

    buf := make([]byte, 4*1024)

    for {
        n, err := c.conn.Read(buf)
        if err != nil {
            if ne, ok := err.(net.Error); ok {
                switch {
                case ne.Temporary():
                    continue
                }
            }
            log.Println("Read", err)
            return
        }

        wBuf := make([]byte, n) // <- 別のSliceを作って
        copy(wBuf, buf[:n])     // <- コピー
        go c.handleEcho(wBuf)
    }
}

と、handleEchoに渡すSliceは書き換えられることがないように都度作成してコピーするようにします。
このように実装を修正して、もう一度tcp4/cmd/sample/main.goを実行すると、順番は一部入れ替わっているものの、0 ~ 255の全てのレスポンスが返ってきていることが確認できます。

Shutdown時にWriteを止める

Shutdown時にhandleEchoのgoroutineが複数実行中であった時、c.conn.Close()が呼ばれた後にWriteを複数回実行することになり、use of closed network connectionが余計に発生することになります。
余計なエラーのログを避けるため、Closeした後は、Writeしないように実装します。

// func newConn
ctxWrite, stopWrite := context.WithCancel(context.Background())

func (c *Conn) handleConnection() {
    defer func() {
        c.stopWrite() // <- 追加
        c.conn.Close()
        c.svr.Wg.Done()
    }()

    go c.handleRead()

    select {
    case <-c.ctxRead.Done():
    case <-c.svr.ctx.Done():
    case <-c.svr.AcceptCtx.Done():
    }
}

func (c *Conn) handleEcho(buf []byte) {
    // do something

    // write
    for {
        select {
        case <-c.ctxWrite.Done(): // <- 追加
            return
        default:
            n, err := c.conn.Write(buf)
            if err != nil {
                if nerr, ok := err.(net.Error); ok {
                    if nerr.Temporary() {
                        buf = buf[n:]
                        continue
                    }
                }
                log.Println("Write error", err)
                // write error
                c.stopRead()
            }
            return
        }
    }
}

しかし、この実装のselectdefaultでは不十分です。複数のhandleEchoが実行された時、複数のgoroutineでc.conn.Write(buf)の内部のロックで処理が止まり、c.ctxWrite.Done()が有効になった後もc.conn.Write(buf)の中で待ち続けてしまいます。
複数のロックを待ち受けるには、select caseを使ってchannelを待つ必要があります。
そのためには、c.conn.Write(buf)をチャネルを使ってロックをかける必要があります。
それには、GoConで発表してきたのでついでにruntime以下の知識をまとめていく #golang 内のスライドの27ページ目で紹介されている Semaphore的なchannelの使い方を用います。

// func newConn
sem := make(chan struct{}, 1)

func (c *Conn) handleEcho(buf []byte) {
    // do something

    // write
    select {
    case <-c.ctxWrite.Done():
        return
    case c.sem <- struct{}{}:
        defer func() { <-c.sem }()
        for {
            n, err := c.conn.Write(buf)
            if err != nil {
                if nerr, ok := err.(net.Error); ok {
                    if nerr.Temporary() {
                        buf = buf[n:]
                        continue
                    }
                }
                log.Println("Write error", err)
                // write error
                c.stopRead()
            }
            return
        }
    }
}

バッファーサイズが1のchannelを使って、必ずc.conn.Write(buf)を実行しているgoroutineが一つであることを保証しています。

最後に

ここまででWriteを非同期化しました。
次回は、Graceful ShutdownとRestartのシグナルハンドリングを実装します。

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1