Help us understand the problem. What is going on with this article?

GolangでTCPサーバーにゼロダウンタイム再起動とGraceful Shutdownを実装する

More than 1 year has passed since last update.

6月からDMM.comラボの六本木オフィスでミドルウェアを作るエンジニアインターンをしている@kawasin73です。
DMM.comラボではluaで実装されたKVS(キーバリューストア)を利用しています。
これは、TCPの上で独自プロトコルで通信しており、URIのPathがKeyとなり最長共通接頭辞検索をするKVSで、社内でluaの皮を被ったC言語で実装されたものが運用されています。
この度、このKVSをGo言語で再実装することになり、設計は既存のミドルウェアを踏襲した形で DMM.com ラボの方が行い、実装は僕がすることになりました。
Go言語の実装手法(goroutine や channel等)については僕が学びながらそれについて都度相談するというスタイルで行なっています。
その開発記を連載しています。

第1回 GoでTCPサーバー上で独自プロトコルKey-Value Storeを実装する(知識編)
第2回 GolangでハイパフォーマンスなTCPサーバーを実装する(下準備編)
第3回 Go言語でTCPのEchoサーバーを実直に実装する
第4回 Go言語でシグナルハンドリングをするTCPのEchoサーバーを実装する
第5回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その1)
第6回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その2)

第7回の今回は、ゼロダウンタイムの再起動とGraceful Shutdownを実装していきます。

はじめに

開発環境は以下の通りです。

  • OS: macOS 10.12.5
  • Go version: 1.8.3
  • IDE: Gogland

今回開発した内容は、こちらのリポジトリtcp5ディレクトリで公開しています。

https://github.com/dmmlabo/tcpserver_go

Graceful Shutdownとは

Shutdownは、処理中のリクエストがあったとしても完了を待たずに即座にサーバーの終了を行います。そのため、クライアント側ではリクエストが正しく処理されたかどうかはわからなくなります。
一方、Graceful Shutdownでは、新しいリクエストの受付を停止し、処理中の全てのリクエストの完了後にサーバーの終了を行います。

HTTPサーバーのGraceful Shutdownライブラリはいくつかあるようですが、今回は、TCPサーバーを扱うので、Graceful Shutdownの処理も実装しました。

再起動

Listenするホスト名やログファイルのパスなどを外部ファイルや環境変数などで設定している時、再起動ができるようになると変更をゼロダウンタイムで反映することができるようになります。

再起動には2種類の手順があります。

  • Listenするホスト名とポート番号が変わらない場合
  • Listenするホスト名とポート番号が変わる場合

です。

Listenするホスト名とポート番号が変わらない場合、ListenerをCloseする必要はありません。
Configを再設定するだけで再起動の処理は終わります。

Listenするホスト名とポート番号が変わる場合は、Listenerを切り替える必要があります。

  1. 新しいホスト名とポート番号でServerを作りListen
  2. 稼働中の古いServerをGraceful Shutdown

このようにホスト名とポート番号が変わるか変わらないかで処理を変更することで設定値の変更をゼロダウンタイムで行うことができます。

なお、機能追加やバグの修正などでサーバーの処理を変更して新しいバイナリに変更する場合の再起動は、別の仕組みで対応する必要があります。
バイナリの変更を伴う再起動はこの章では扱いません。

Graceful Shutdown を実装する

main() と Server の処理

まずmain()と、ServerにGraceful Shutdownの処理を追加します。
コミットID: 5050fe3e49fa29c22404e8b525e4fce34438d732
処理としては、Shutdownの時と同じ処理です。

main.go
func main() {
    sigChan := make(chan os.Signal, 1)
    // Ignore all signals
    signal.Ignore()
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

    svr := server.NewServer(context.Background(), "127.0.0.1:12345")

    err := svr.Listen()

    if err != nil {
        log.Fatal("Listen()", err)
    }

    log.Println("Server Started")

    select {
    case sig := <-sigChan:
        switch sig {
        case syscall.SIGINT, syscall.SIGTERM:
            log.Println("Server Shutdown...")
            svr.Shutdown()

            svr.Wg.Wait()
            <-svr.ChClosed
            log.Println("Server Shutdown Completed")
        case syscall.SIGQUIT: // <- ここを追加
            log.Println("Server Graceful Shutdown...")
            svr.GracefulShutdown()

            svr.Wg.Wait()
            <-svr.ChClosed
            log.Println("Server Graceful Shutdown Completed")
        default:
            panic("unexpected signal has been received")
        }
    case <-svr.AcceptCtx.Done():
        log.Println("Server Error Occurred")
        svr.Wg.Wait()
        <-svr.ChClosed
        log.Println("Server Shutdown Completed")
    }
}
server.go
func (s *Server) GracefulShutdown() {
    select {
    case <-s.ctxGraceful.Done():
        // already shutdown
    default:
        s.gshutdown()
        s.listener.Close()
    }
}

Conn の処理

次にConnの処理を追加します。
シグナルを受け取った時にReadを終了し、すでにReadしているもののWriteが全て終了するのを待って、コネクションを終了します。
Writeはgoroutineで非同期化されているので、どこまでWriteが終了しているかを知る必要があります。
それには、sync.WaitGroupを使います。

コミットID: d26bb371f357c79fc175b3b244726d0f4d609c38

conn.go
type Conn struct {
    // ~~ 省略 ~~
    wg        sync.WaitGroup // <- 追加
}

func (c *Conn) handleConnection() {
    defer func() {
        c.stopWrite()
        c.conn.Close()
        c.svr.Wg.Done()
    }()

    go c.handleRead()

    select {
    case <-c.ctxRead.Done():
    case <-c.svr.ctxShutdown.Done():
    case <-c.svr.AcceptCtx.Done():
    case <-c.svr.ctxGraceful.Done(): // <- 追加
        c.conn.CloseRead()
        c.wg.Wait()
    }
}

func (c *Conn) handleRead() {
    defer c.stopRead()

    buf := make([]byte, 4*1024)

    for {
        n, err := c.conn.Read(buf)
        if err != nil {
            if ne, ok := err.(net.Error); ok {
                switch {
                case ne.Temporary():
                    continue
                }
            }
            log.Println("Read", err)
            return
        }

        wBuf := make([]byte, n)
        copy(wBuf, buf[:n])
        c.wg.Add(1) // <- 追加
        go c.handleEcho(wBuf)
    }
}

func (c *Conn) handleEcho(buf []byte) {
    defer c.wg.Done() // <- 追加
    // do something

    // write
    select {
    case <-c.ctxWrite.Done():
        return
    case c.sem <- struct{}{}:
        defer func() { <-c.sem }()
        for {
            n, err := c.conn.Write(buf)
            if err != nil {
                if nerr, ok := err.(net.Error); ok {
                    if nerr.Temporary() {
                        buf = buf[n:]
                        continue
                    }
                }
                log.Println("Write error", err)
                // write error
                c.stopRead()
                c.stopWrite() // <- 追加
            }
            return
        }
    }
}

Graceful ShutdownがされたらhandleConnectionの中で

c.conn.CloseRead()
c.wg.Wait()

をします。これ以降のReadを終了し、Writeが全て終わるまで待ちます。

再起動 を実装する

続いて再起動を実装します。

コミットID: dc60ea17967ee3498233e42748b2f1f4a151e231

contexts の追加

Listenするホスト名とポート番号が変わらない再起動をする時、ListenerはCloseしないで、すでに接続済みのコネクションをGraceful Shutdownします。
Serverを再利用したいですがctxGracefulは一度しか使えません。
そこで新しくcontextsを導入して、再起動のたびにcontextsを差し替えてServerを再利用しつつGraceful Shutdownを実現することにしました。

server/server.go
type contexts struct {
    ctxShutdown context.Context
    shutdown    context.CancelFunc
    ctxGraceful context.Context
    gshutdown   context.CancelFunc
}

そして、Conn*Server*contextsを持つようにします。

server/conn.go
type Conn struct {
    svr       *Server
    ctx       *contexts // <- 追加
    // ~~ 省略 ~~
}

Restart の追加とhandleListener内でのcontextsの変更通知

server/server.go
func NewServer(parent context.Context, addr string) *Server {
    ctx := newContext(parent)
    acceptCtx, errAccept := context.WithCancel(context.Background())
    chClosed := make(chan struct{})
    chCtx := make(chan *contexts, 1) // <- buffer が 1
    return &Server{
        addr:      addr,
        AcceptCtx: acceptCtx,
        errAccept: errAccept,
        ChClosed:  chClosed,
        ctx:       ctx,
        chCtx:     chCtx,
    }
}

func (s *Server) Restart(parent context.Context, addr string) (*Server, error) {
    if addr == s.addr {
        // update contexts. not close listener
        prevCtx := s.ctx
        s.ctx = newContext(parent)
        select {
        case <-s.chCtx:
            // clear s.chCtx if previous contexts have not been popped
        default:
        }
        s.chCtx <- s.ctx
        prevCtx.gshutdown()
        return s, nil
    } else {
        // create new listener
        nextServer := NewServer(parent, addr)
        err := nextServer.Listen()
        if err != nil {
            return nil, err
        }
        s.GracefulShutdown()
        s.Wg.Wait()
        <-s.ChClosed
        return nextServer, nil
    }
}

func (s *Server) handleListener() {
    defer func() {
        s.listener.Close()
        close(s.ChClosed)
    }()

    ctx := s.ctx // <- ctx をローカル変数にする

    for {
        conn, err := s.listener.AcceptTCP()

        select {
        case ctx = <-s.chCtx:
            // update ctx if changed
        default:
        }

        if err != nil {
            if ne, ok := err.(net.Error); ok {
                if ne.Temporary() {
                    log.Println("AcceptTCP", err)
                    continue
                }
            }
            if listenerCloseError(err) {
                select {
                case <-ctx.ctxShutdown.Done():
                    return
                case <-ctx.ctxGraceful.Done():
                    return
                default:
                    // fallthrough
                }
            }

            log.Println("AcceptTCP", err)
            s.errAccept()
            return
        }

        c := newConn(s, ctx, conn)
        s.Wg.Add(1)
        go c.handleConnection()
    }
}

Restart() について説明します。
新しいホスト名とポート番号が前のものと違う場合は、新しいServerを作成してListenした後に、古いServerをGraceful Shutdownします。
新しいホスト名とポート番号が同じ場合は、それ以降にAcceptしたコネクションには新しいcontextsを設定するようにし、それ以前のコネクションはGraceful Shutdownさせます。

contextsが変更されたことを、handleListener()のgoroutineに通知するために、バッファーサイズが1のchannelを利用しました。

        select {
        case ctx = <-s.chCtx:
            // update ctx if changed
        default:
        }

これによって、もしcontextsが変わっていたら更新して、変わっていない場合は以前のcontextsを使うという処理ができます。

main.go

main.go
func main() {
    chSig := make(chan os.Signal, 1)
    // Ignore all signals
    signal.Ignore()
    signal.Notify(chSig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP)

    host := loadConf()
    svr := server.NewServer(context.Background(), host)
    err := svr.Listen()

    if err != nil {
        log.Fatal("Listen()", err)
    }
    log.Println("Server Started")

    for {
        select {
        case sig := <-chSig:
            switch sig {
            case syscall.SIGINT, syscall.SIGTERM:
                log.Println("Server Shutdown...")
                svr.Shutdown()

                svr.Wg.Wait()
                <-svr.ChClosed
                log.Println("Server Shutdown Completed")
            case syscall.SIGQUIT:
                log.Println("Server Graceful Shutdown...")
                svr.GracefulShutdown()

                svr.Wg.Wait()
                <-svr.ChClosed
                log.Println("Server Graceful Shutdown Completed")
            case syscall.SIGHUP:   // <- 追加
                log.Println("Server Restarting...")

                host = loadConf()

                svr, err = svr.Restart(context.Background(), host)
                if err != nil {
                    log.Fatal(err)
                }
                log.Println("Server Restarted")
                continue
            default:
                panic("unexpected signal has been received")
            }
        case <-svr.AcceptCtx.Done():
            log.Println("Server Error Occurred")
            svr.Wg.Wait()
            <-svr.ChClosed
            log.Println("Server Shutdown Completed")
        }
        return
    }
}

var first = true

func loadConf() string {
    // TODO: load config from file or env
    if first {
        first = false
        return "127.0.0.1:12345"
    } else {
        return "127.0.0.1:12346"
    }
}

syscall.SIGHUPを受け取った時に再起動します。
再起動を実現するためにはループを使うので、for文を使います。
再起動の時は、continueをして処理を継続します。
それ以外のShutdown, Graceful Shutdown, サーバーエラーの場合は、returnをしてループから抜けています。

func loadConf() stringでホスト名をロードするようにしていますが、ホスト名のローディングに細工をしています。
それにより、再起動をすると以下のような挙動になります。

  1. 立ち上げた時は127.0.0.1:12345Listenします。
  2. 一度再起動すると、127.0.0.1:12346にポート番号を変更します。
  3. 2回目以降は、再起動してもホスト名とポート番号は変更しません。

Shutdown と Graceful Shutdown と 再起動 を手動テスト

Graceful Shutdownと再起動の実装が完了しました。
ここからは正しく動いているかを確かめていきます。
ただし、echoサーバーはReadしてからWriteするまでが高速なため、Graceful ShutdownとShutdownの違いを見分けることが難しいです。
そのため、以下のようにWriteをするまでに5秒スリープさせてReadしてからWriteするまでを遅延させます。

conn.go
func (c *Conn) handleEcho(buf []byte) {
    defer c.wg.Done()
    // do something
    time.Sleep(5 * time.Second) // <- 追加

    // write
    select {
    case <-c.ctxWrite.Done():
        return
    case c.sem <- struct{}{}:
        defer func() { <-c.sem }()
        for {
            n, err := c.conn.Write(buf)
            if err != nil {
                if nerr, ok := err.(net.Error); ok {
                    if nerr.Temporary() {
                        buf = buf[n:]
                        continue
                    }
                }
                log.Println("Write error", err)
                // write error
                c.stopRead()
                c.stopWrite()
            }
            return
        }
    }
}

Signal の送信

macOSでは、以下のようにしてアクティビティモニタからGUIでシグナルを送信することができます。

アクティビティモニタ__自分のプロセス__と_Go言語でGraceful_ShutdownができるTCPのechoサーバーを実装する.png

  • サーバーのプロセスを選択し、アクティビティモニタのステータスバーにある「表示」->「シグナルをプロセスに送信」を選びます。
  • go run main.goでサーバーを立ち上げた場合は、mainというプロセス名になっています。

アクティビティモニタ__自分のプロセス_.png

  • 送るシグナルの種類を選択して、「送信」ボタンを押します。

これで自由にシグナルを送ることができます。

telnet で接続

以下のコマンドでechoサーバーが立ち上がります。

go run main.go

ターミナルを複数画面開いて、

telnet localhost 12345

とすると、接続できます。何か文字列を打ち込んで送信し(Enter)同じ文字列が5秒後にレスポンスされることを確認しましょう。

実際にやってみる

telnet で複数のリクエストをレスポンスを待たずに送信し、すぐにアクティビティモニターでシグナルを送ってみてください。

  • SIGINT, SIGTERMでは、シグナルを送った時点でtelnetの接続が切断されサーバーが終了します。
  • SIGQUITでは、全てのレスポンスが返ってきた後に接続が切断されます。また、SIGQUITを送った後に送信したリクエストは無視されています。
  • SIGHUPでは、telnetの接続がGraceful Shutdownのように切断されます。また、もう一度telnetで127.0.0.1:12346に接続すると再接続することができます。またSIGHUPを送ると接続が切断されますが、また127.0.0.1:12346に接続することができます。

最後に

これで、Graceful Shutdownと再起動可能なTCPのechoサーバーの実装は完了です。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした