6月からDMM.comラボの六本木オフィスでミドルウェアを作るエンジニアインターンをしている@kawasin73です。
DMM.comラボではluaで実装されたKVS(キーバリューストア)を利用しています。
これは、TCPの上で独自プロトコルで通信しており、URIのPathがKeyとなり最長共通接頭辞検索をするKVSで、社内でluaの皮を被ったC言語で実装されたものが運用されています。
この度、このKVSをGo言語で再実装することになり、設計は既存のミドルウェアを踏襲した形で DMM.com ラボの方が行い、実装は僕がすることになりました。
Go言語の実装手法(goroutine や channel等)については僕が学びながらそれについて都度相談するというスタイルで行なっています。
その開発記を連載しています。
第1回 GoでTCPサーバー上で独自プロトコルKey-Value Storeを実装する(知識編)
第2回 GolangでハイパフォーマンスなTCPサーバーを実装する(下準備編)
第3回 Go言語でTCPのEchoサーバーを実直に実装する
第4回 Go言語でシグナルハンドリングをするTCPのEchoサーバーを実装する
第5回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その1)
第6回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その2)
第7回の今回は、ゼロダウンタイムの再起動とGraceful Shutdownを実装していきます。
はじめに
開発環境は以下の通りです。
- OS: macOS 10.12.5
- Go version: 1.8.3
- IDE: Gogland
今回開発した内容は、こちらのリポジトリのtcp5
ディレクトリで公開しています。
Graceful Shutdownとは
Shutdownは、処理中のリクエストがあったとしても完了を待たずに即座にサーバーの終了を行います。そのため、クライアント側ではリクエストが正しく処理されたかどうかはわからなくなります。
一方、Graceful Shutdownでは、新しいリクエストの受付を停止し、処理中の全てのリクエストの完了後にサーバーの終了を行います。
HTTPサーバーのGraceful Shutdownライブラリはいくつかあるようですが、今回は、TCPサーバーを扱うので、Graceful Shutdownの処理も実装しました。
再起動
Listenするホスト名やログファイルのパスなどを外部ファイルや環境変数などで設定している時、再起動ができるようになると変更をゼロダウンタイムで反映することができるようになります。
再起動には2種類の手順があります。
- Listenするホスト名とポート番号が変わらない場合
- Listenするホスト名とポート番号が変わる場合
です。
Listenするホスト名とポート番号が変わらない場合、ListenerをClose
する必要はありません。
Configを再設定するだけで再起動の処理は終わります。
Listenするホスト名とポート番号が変わる場合は、Listenerを切り替える必要があります。
- 新しいホスト名とポート番号で
Server
を作りListen - 稼働中の古い
Server
をGraceful Shutdown
このようにホスト名とポート番号が変わるか変わらないかで処理を変更することで設定値の変更をゼロダウンタイムで行うことができます。
なお、機能追加やバグの修正などでサーバーの処理を変更して新しいバイナリに変更する場合の再起動は、別の仕組みで対応する必要があります。
バイナリの変更を伴う再起動はこの章では扱いません。
Graceful Shutdown を実装する
main() と Server の処理
まずmain()
と、Server
にGraceful Shutdownの処理を追加します。
コミットID: 5050fe3e49fa29c22404e8b525e4fce34438d732
処理としては、Shutdownの時と同じ処理です。
func main() {
sigChan := make(chan os.Signal, 1)
// Ignore all signals
signal.Ignore()
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
svr := server.NewServer(context.Background(), "127.0.0.1:12345")
err := svr.Listen()
if err != nil {
log.Fatal("Listen()", err)
}
log.Println("Server Started")
select {
case sig := <-sigChan:
switch sig {
case syscall.SIGINT, syscall.SIGTERM:
log.Println("Server Shutdown...")
svr.Shutdown()
svr.Wg.Wait()
<-svr.ChClosed
log.Println("Server Shutdown Completed")
case syscall.SIGQUIT: // <- ここを追加
log.Println("Server Graceful Shutdown...")
svr.GracefulShutdown()
svr.Wg.Wait()
<-svr.ChClosed
log.Println("Server Graceful Shutdown Completed")
default:
panic("unexpected signal has been received")
}
case <-svr.AcceptCtx.Done():
log.Println("Server Error Occurred")
svr.Wg.Wait()
<-svr.ChClosed
log.Println("Server Shutdown Completed")
}
}
func (s *Server) GracefulShutdown() {
select {
case <-s.ctxGraceful.Done():
// already shutdown
default:
s.gshutdown()
s.listener.Close()
}
}
Conn の処理
次にConn
の処理を追加します。
シグナルを受け取った時にRead
を終了し、すでにRead
しているもののWrite
が全て終了するのを待って、コネクションを終了します。
Write
はgoroutineで非同期化されているので、どこまでWrite
が終了しているかを知る必要があります。
それには、sync.WaitGroup
を使います。
コミットID: d26bb371f357c79fc175b3b244726d0f4d609c38
type Conn struct {
// ~~ 省略 ~~
wg sync.WaitGroup // <- 追加
}
func (c *Conn) handleConnection() {
defer func() {
c.stopWrite()
c.conn.Close()
c.svr.Wg.Done()
}()
go c.handleRead()
select {
case <-c.ctxRead.Done():
case <-c.svr.ctxShutdown.Done():
case <-c.svr.AcceptCtx.Done():
case <-c.svr.ctxGraceful.Done(): // <- 追加
c.conn.CloseRead()
c.wg.Wait()
}
}
func (c *Conn) handleRead() {
defer c.stopRead()
buf := make([]byte, 4*1024)
for {
n, err := c.conn.Read(buf)
if err != nil {
if ne, ok := err.(net.Error); ok {
switch {
case ne.Temporary():
continue
}
}
log.Println("Read", err)
return
}
wBuf := make([]byte, n)
copy(wBuf, buf[:n])
c.wg.Add(1) // <- 追加
go c.handleEcho(wBuf)
}
}
func (c *Conn) handleEcho(buf []byte) {
defer c.wg.Done() // <- 追加
// do something
// write
select {
case <-c.ctxWrite.Done():
return
case c.sem <- struct{}{}:
defer func() { <-c.sem }()
for {
n, err := c.conn.Write(buf)
if err != nil {
if nerr, ok := err.(net.Error); ok {
if nerr.Temporary() {
buf = buf[n:]
continue
}
}
log.Println("Write error", err)
// write error
c.stopRead()
c.stopWrite() // <- 追加
}
return
}
}
}
Graceful ShutdownがされたらhandleConnection
の中で
c.conn.CloseRead()
c.wg.Wait()
をします。これ以降のRead
を終了し、Write
が全て終わるまで待ちます。
再起動 を実装する
続いて再起動を実装します。
コミットID: dc60ea17967ee3498233e42748b2f1f4a151e231
contexts の追加
Listenするホスト名とポート番号が変わらない再起動をする時、ListenerはCloseしないで、すでに接続済みのコネクションをGraceful Shutdownします。
Serverを再利用したいですがctxGraceful
は一度しか使えません。
そこで新しくcontexts
を導入して、再起動のたびにcontexts
を差し替えてServerを再利用しつつGraceful Shutdownを実現することにしました。
type contexts struct {
ctxShutdown context.Context
shutdown context.CancelFunc
ctxGraceful context.Context
gshutdown context.CancelFunc
}
そして、Conn
に*Server
と*contexts
を持つようにします。
type Conn struct {
svr *Server
ctx *contexts // <- 追加
// ~~ 省略 ~~
}
Restart の追加とhandleListener
内でのcontexts
の変更通知
func NewServer(parent context.Context, addr string) *Server {
ctx := newContext(parent)
acceptCtx, errAccept := context.WithCancel(context.Background())
chClosed := make(chan struct{})
chCtx := make(chan *contexts, 1) // <- buffer が 1
return &Server{
addr: addr,
AcceptCtx: acceptCtx,
errAccept: errAccept,
ChClosed: chClosed,
ctx: ctx,
chCtx: chCtx,
}
}
func (s *Server) Restart(parent context.Context, addr string) (*Server, error) {
if addr == s.addr {
// update contexts. not close listener
prevCtx := s.ctx
s.ctx = newContext(parent)
select {
case <-s.chCtx:
// clear s.chCtx if previous contexts have not been popped
default:
}
s.chCtx <- s.ctx
prevCtx.gshutdown()
return s, nil
} else {
// create new listener
nextServer := NewServer(parent, addr)
err := nextServer.Listen()
if err != nil {
return nil, err
}
s.GracefulShutdown()
s.Wg.Wait()
<-s.ChClosed
return nextServer, nil
}
}
func (s *Server) handleListener() {
defer func() {
s.listener.Close()
close(s.ChClosed)
}()
ctx := s.ctx // <- ctx をローカル変数にする
for {
conn, err := s.listener.AcceptTCP()
select {
case ctx = <-s.chCtx:
// update ctx if changed
default:
}
if err != nil {
if ne, ok := err.(net.Error); ok {
if ne.Temporary() {
log.Println("AcceptTCP", err)
continue
}
}
if listenerCloseError(err) {
select {
case <-ctx.ctxShutdown.Done():
return
case <-ctx.ctxGraceful.Done():
return
default:
// fallthrough
}
}
log.Println("AcceptTCP", err)
s.errAccept()
return
}
c := newConn(s, ctx, conn)
s.Wg.Add(1)
go c.handleConnection()
}
}
Restart()
について説明します。
新しいホスト名とポート番号が前のものと違う場合は、新しいServerを作成してListen
した後に、古いServerをGraceful Shutdownします。
新しいホスト名とポート番号が同じ場合は、それ以降にAccept
したコネクションには新しいcontexts
を設定するようにし、それ以前のコネクションはGraceful Shutdownさせます。
contexts
が変更されたことを、handleListener()
のgoroutineに通知するために、バッファーサイズが1のchannelを利用しました。
select {
case ctx = <-s.chCtx:
// update ctx if changed
default:
}
これによって、もしcontexts
が変わっていたら更新して、変わっていない場合は以前のcontexts
を使うという処理ができます。
main.go
func main() {
chSig := make(chan os.Signal, 1)
// Ignore all signals
signal.Ignore()
signal.Notify(chSig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP)
host := loadConf()
svr := server.NewServer(context.Background(), host)
err := svr.Listen()
if err != nil {
log.Fatal("Listen()", err)
}
log.Println("Server Started")
for {
select {
case sig := <-chSig:
switch sig {
case syscall.SIGINT, syscall.SIGTERM:
log.Println("Server Shutdown...")
svr.Shutdown()
svr.Wg.Wait()
<-svr.ChClosed
log.Println("Server Shutdown Completed")
case syscall.SIGQUIT:
log.Println("Server Graceful Shutdown...")
svr.GracefulShutdown()
svr.Wg.Wait()
<-svr.ChClosed
log.Println("Server Graceful Shutdown Completed")
case syscall.SIGHUP: // <- 追加
log.Println("Server Restarting...")
host = loadConf()
svr, err = svr.Restart(context.Background(), host)
if err != nil {
log.Fatal(err)
}
log.Println("Server Restarted")
continue
default:
panic("unexpected signal has been received")
}
case <-svr.AcceptCtx.Done():
log.Println("Server Error Occurred")
svr.Wg.Wait()
<-svr.ChClosed
log.Println("Server Shutdown Completed")
}
return
}
}
var first = true
func loadConf() string {
// TODO: load config from file or env
if first {
first = false
return "127.0.0.1:12345"
} else {
return "127.0.0.1:12346"
}
}
syscall.SIGHUP
を受け取った時に再起動します。
再起動を実現するためにはループを使うので、for
文を使います。
再起動の時は、continue
をして処理を継続します。
それ以外のShutdown, Graceful Shutdown, サーバーエラーの場合は、return
をしてループから抜けています。
func loadConf() string
でホスト名をロードするようにしていますが、ホスト名のローディングに細工をしています。
それにより、再起動をすると以下のような挙動になります。
- 立ち上げた時は
127.0.0.1:12345
をListen
します。 - 一度再起動すると、
127.0.0.1:12346
にポート番号を変更します。 - 2回目以降は、再起動してもホスト名とポート番号は変更しません。
Shutdown と Graceful Shutdown と 再起動 を手動テスト
Graceful Shutdownと再起動の実装が完了しました。
ここからは正しく動いているかを確かめていきます。
ただし、echoサーバーはRead
してからWrite
するまでが高速なため、Graceful ShutdownとShutdownの違いを見分けることが難しいです。
そのため、以下のようにWrite
をするまでに5秒スリープさせてRead
してからWrite
するまでを遅延させます。
func (c *Conn) handleEcho(buf []byte) {
defer c.wg.Done()
// do something
time.Sleep(5 * time.Second) // <- 追加
// write
select {
case <-c.ctxWrite.Done():
return
case c.sem <- struct{}{}:
defer func() { <-c.sem }()
for {
n, err := c.conn.Write(buf)
if err != nil {
if nerr, ok := err.(net.Error); ok {
if nerr.Temporary() {
buf = buf[n:]
continue
}
}
log.Println("Write error", err)
// write error
c.stopRead()
c.stopWrite()
}
return
}
}
}
Signal の送信
macOSでは、以下のようにしてアクティビティモニタからGUIでシグナルを送信することができます。
- サーバーのプロセスを選択し、アクティビティモニタのステータスバーにある「表示」->「シグナルをプロセスに送信」を選びます。
-
go run main.go
でサーバーを立ち上げた場合は、main
というプロセス名になっています。
- 送るシグナルの種類を選択して、「送信」ボタンを押します。
これで自由にシグナルを送ることができます。
telnet で接続
以下のコマンドでechoサーバーが立ち上がります。
go run main.go
ターミナルを複数画面開いて、
telnet localhost 12345
とすると、接続できます。何か文字列を打ち込んで送信し(Enter)同じ文字列が5秒後にレスポンスされることを確認しましょう。
実際にやってみる
telnet で複数のリクエストをレスポンスを待たずに送信し、すぐにアクティビティモニターでシグナルを送ってみてください。
-
SIGINT
,SIGTERM
では、シグナルを送った時点でtelnetの接続が切断されサーバーが終了します。 -
SIGQUIT
では、全てのレスポンスが返ってきた後に接続が切断されます。また、SIGQUIT
を送った後に送信したリクエストは無視されています。 -
SIGHUP
では、telnetの接続がGraceful Shutdownのように切断されます。また、もう一度telnetで127.0.0.1:12346
に接続すると再接続することができます。またSIGHUP
を送ると接続が切断されますが、また127.0.0.1:12346
に接続することができます。
最後に
これで、Graceful Shutdownと再起動可能なTCPのechoサーバーの実装は完了です。