Go
golang
TCP
echoサーバー

Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その1)

6月からDMM.comラボの六本木オフィスでミドルウェアを作るエンジニアインターンをしている@kawasin73です。
DMM.comラボではluaで実装されたKVS(キーバリューストア)を利用しています。
これは、TCPの上で独自プロトコルで通信しており、URIのPathがKeyとなり最長共通接頭辞検索をするKVSで、社内でluaの皮を被ったC言語で実装されたものが運用されています。
この度、このKVSをGo言語で再実装することになり、設計は既存のミドルウェアを踏襲した形で DMM.com ラボの方が行い、実装は僕がすることになりました。
Go言語の実装手法(goroutine や channel等)については僕が学びながらそれについて都度相談するというスタイルで行なっています。
その開発記を連載しています。

第1回 GoでTCPサーバー上で独自プロトコルKey-Value Storeを実装する(知識編)
第2回 GolangでハイパフォーマンスなTCPサーバーを実装する(下準備編)
第3回 Go言語でTCPのEchoサーバーを実直に実装する
第4回 Go言語でシグナルハンドリングをするTCPのEchoサーバーを実装する
第5回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その1)
第6回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その2)
第7回 GolangでTCPサーバーに再起動とGraceful Shutdownを実装する

前回までで、シグナルハンドリングを備えたechoサーバーの実装を行いました。
第5回の今回は、すべて関数で実装されているListenerとConnectionの処理をstructを使ってまとめ上げていきます。

はじめに

開発環境は以下の通りです。

  • OS: macOS 10.12.5
  • Go version: 1.8.3
  • IDE: Gogland

今回開発した内容は、こちらのリポジトリtcp3ディレクトリで公開しています。

https://github.com/dmmlabo/tcpserver_go

前回やり残したこと

前回は、SIGINTのシグナルハンドリングを実装しました。
その中ではAcceptTCP()のエラーハンドリングが途中で終わっていました。
今回はそこから再開していきたいと思います。

AcceptTCP()でエラーが発生したときはhandleListenerのgoroutineが終了します。
そして、そのエラーイベントをmain()goroutineと、handleConnectiongoroutineと、handleReadgoroutineに通知して、それぞれのgoroutineを終了させなければなりません。

+--------+      +----------------+             +------------------+  +------------+
| main() |      | handleListener |             | handleConnection |  | handleRead |
+---+----+      +--------+-------+             +--------+---------+  +------+-----+
    |                    |                              |                   |
    |           Error on AcceptTCP()                    |                   |
    |                    |                              |                   |
    |    notify cancel   |        notify cancel         |                   |
    | <------------------+----------------------------> |                   |
    |                                                   |   notify cancel   |
 Finish                                                 +-----------------> |
                                                        |                   |
                                                     Finish              Finish

handleConnectionからhandleReadへのキャンセルを通知する仕組みはすでにあるため、handleListenerから、main()handleConnectionへ、キャンセルイベントを通知する仕組みを追加する必要があります。

キャンセルイベントの通知には、前回と同様にcontext.Contextを用います。

見えてきた限界・・・。structでまとめる

main()handleListener と、 handleConnection で同じcontext.Contextにアクセスするためには、handleListenerhandleConnection の引数に渡して共有する必要があります。
ここで限界が見えてきます。
すでに、

  • func handleListener(l *net.TCPListener, serverCtx context.Context, wg *sync.WaitGroup, chClosed chan struct{})
  • func handleConnection(conn *net.TCPConn, serverCtx context.Context, wg *sync.WaitGroup)

と、それぞれのメソッドは多くの引数を持っており、これに新しいcontext.Contextを追加すると複雑になり分かりにくくなります。

これを解決するために、struct を利用することにします。
handleConnectionで利用するプロパティをServerstructに、handleConnectionで利用するプロパティをConnstructにまとめて保持することにします。

Server

handleListenerServerのメソッドにしてしまうことで、引数をなくすことができました。

type Server struct {
    addr      string
    listener  *net.TCPListener
    ctx       context.Context
    shutdown  context.CancelFunc
    Wg        sync.WaitGroup
    ChClosed  chan struct{}
}

func (s *Server) handleListener() {
    defer func() {
        s.listener.Close()
        close(s.ChClosed)
    }()
    for {
        conn, err := s.listener.AcceptTCP()
        if err != nil {
            // 省略 ...
        }

        wg.Add(1)
        go handleConnection(conn, s.ctx, &s.Wg)
    }
}

前回(tcp2)は、ListenTCPmain()の中で行なっていますが、Listenerの取り扱いは本来はServerの責務のはずです。
Listen()メソッドをServerに追加して、サーバーの起動処理をまとめます。
また、シャットダウン時の処理もListenerを触っているため、ShutDown()としてServerのメソッドにしてしまいます。

func (s *Server) Shutdown() {
    select {
    case <-s.ctx.Done():
        // already shutdown
    default:
        s.shutdown()
        s.listener.Close()
    }
}

func (s *Server) Listen() error {
    tcpAddr, err := net.ResolveTCPAddr("tcp", s.addr)
    if err != nil {
        return err
    }

    l, err := net.ListenTCP("tcp", tcpAddr)
    if err != nil {
        return err
    }
    s.listener = l

    go s.handleListener()
    return nil
}

Conn

handleConnectionhandleReadConnのメソッドにしました。これにより、handleConnectionhandleReadで共有するerrReadを引数で渡さなくてもよくなります。
Connは内部にServerのポインタを持つようにしました。wgserverCtxConnからアクセスすることができます。

type Conn struct {
    svr     *Server
    conn    *net.TCPConn
    readCtx context.Context
    errRead context.CancelFunc
}

func (c *Conn) handleConnection() {
    defer func() {
        c.conn.Close()
        c.svr.Wg.Done()
    }()

    go c.handleRead()

    select {
    case <-c.readCtx.Done():
    case <-c.svr.ctx.Done():
    }
}

役割の境界が見えてくる。 serverパッケージを作る

ここまで作ってきて、ぼんやりと境界が見えてきました。

  • TCPListenerや、TCPConnなどのnetパッケージにあるものを操作するサーバーの役割
  • シグナルハンドリングやサーバーの起動など、前者を使ってハンドリングする役割

前者をserverパッケージ、後者を引き続きmainパッケージとして分離します。

$ tree
.
├── main.go
└── server
    ├── conn.go
    └── server.go

1 directory, 3 files

Go言語におけるパッケージの分離は、フォルダを分けることで実現します。

サーバーのエラーによるキャンセルイベントを伝播する

ConnServerに分離したことで、新しいフィールドの追加が気軽に行えるようになりました。
ここで、本来やりたかったAcceptTCP()のエラーを全てのgoroutineに伝える処理を追加します。

  • Server struct に context.Context を追加
type Server struct {
    AcceptCtx context.Context
    errAccept context.CancelFunc
}
  • handleListener で、エラーの発生を通知する
func (s *Server) handleListener() {
    defer func() {
        s.listener.Close()
        close(s.ChClosed)
    }()
    for {
        conn, err := s.listener.AcceptTCP()
        if err != nil {
            if ne, ok := err.(net.Error); ok {
                if ne.Temporary() {
                    log.Println("AcceptTCP", err)
                    continue
                }
            }
            if listenerCloseError(err) {
                select {
                case <-s.ctx.Done():
                    return
                default:
                    // fallthrough
                }
            }

            log.Println("AcceptTCP", err)
            s.errAccept() // <- 追加
            return
        }

        c := newConn(s, conn)
        s.Wg.Add(1)
        go c.handleConnection()
    }
}
  • handleConnectionで、Listenerのエラーをキャッチ
func (c *Conn) handleConnection() {
    defer func() {
        c.conn.Close()
        c.svr.Wg.Done()
    }()

    go c.handleRead()

    select {
    case <-c.readCtx.Done():
    case <-c.svr.ctx.Done():
    case <-c.svr.AcceptCtx.Done(): // <- 追加
    }
}
  • main()Listenerのエラーをキャッチ
    select {
    case sig := <-sigChan:
        switch sig {
        case syscall.SIGINT:
            log.Println("Server Shutdown...")
            svr.Shutdown()

            svr.Wg.Wait()
            <-svr.ChClosed
            log.Println("Server Shutdown Completed")
        default:
            panic("unexpected signal has been received")
        }
    case <-svr.AcceptCtx.Done(): // <- 追加
        log.Println("Server Error Occurred")
        // wait until all connection closed
        svr.Wg.Wait()
        <-svr.ChClosed
        log.Println("Server Shutdown Completed")
    }

main() では、シグナルのイベントとサーバーエラーのイベントを同時に待ち受けます。これには、selectcaseを使います。

最後に

パッケージの分離と、ServerConnのstruct化、サーバーエラーによるシャットダウンを行いました。
次回は、Writeの非同期化に取り掛かります。