6月からDMM.comラボの六本木オフィスでミドルウェアを作るエンジニアインターンをしている@kawasin73です。
DMM.comラボではluaで実装されたKVS(キーバリューストア)を利用しています。
これは、TCPの上で独自プロトコルで通信しており、URIのPathがKeyとなり最長共通接頭辞検索をするKVSで、社内でluaの皮を被ったC言語で実装されたものが運用されています。
この度、このKVSをGo言語で再実装することになり、設計は既存のミドルウェアを踏襲した形で DMM.com ラボの方が行い、実装は僕がすることになりました。
Go言語の実装手法(goroutine や channel等)については僕が学びながらそれについて都度相談するというスタイルで行なっています。
その開発記を連載しています。
第1回 GoでTCPサーバー上で独自プロトコルKey-Value Storeを実装する(知識編)
第2回 GolangでハイパフォーマンスなTCPサーバーを実装する(下準備編)
第3回 Go言語でTCPのEchoサーバーを実直に実装する
第4回 Go言語でシグナルハンドリングをするTCPのEchoサーバーを実装する
第5回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その1)
第6回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その2)
第7回 GolangでTCPサーバーに再起動とGraceful Shutdownを実装する
前回は、TCPのEchoサーバーを実直に実装するところまで進めました。
第4回の今回は、Graceful Shutdownを実現する第1歩として、シグナルハンドリングを組み込んでいきます。
はじめに
開発環境は以下の通りです。
- OS: macOS 10.12.5
- Go version: 1.8.3
- IDE: Gogland
今回開発した内容は、こちらのgithubリポジトリで公開しています。
シグナルのハンドリング
プロセスはシグナルを割り込みで受け付けることによって、処理の中断やシャットダウンを行います。
今回は、最終ゴールとして、このようなハンドリングを行うことにしました。
Signal | 処理 |
---|---|
SIGINT | Shutdown |
SIGTERM | Shutdown |
SIGQUIT | Graceful Shutdown |
SIGHUP | Restart |
その他 | 無視 |
また、SIGKILL
と SIGSTOP
は、言語レベルではハンドリングすることはできません。
そのうち、今回はSIGINT
でシャットダウンする処理を実装し、それ以外のシグナルについては無視するようにします。
Goでは、シグナルは以下のようにしてチャネルで受け取ります。
sigChan := make(chan os.Signal, 1)
// Ignore all signals
signal.Ignore()
signal.Notify(sigChan, syscall.SIGINT)
シグナルを受け取ると、sigChan
に受け取ったシグナルが通知されます。
キャンセル処理
<-sigChan
上記のコードで、シグナルを受け取ったかどうかを検出することができますが、シグナルを受け取るまでスレッドをブロックします。
そのため、シグナルを監視する処理は、handleListener()
を実行しているgoroutineとは別のgoroutineとして実行する必要があります。
前回は、main()
を実行するスレッドで、handleListener
を実行していました。
今回は、main()
を実行するスレッドではシグナルを監視し、handleListener
は別のgoroutineとして実行することにしました。
- シグナルのイベントを監視するgoroutine (
main()
) - Listenerで
Accept
しているgoroutine (handleListener
) - コネクションの処理を行っているgoroutine (
handleConnection
)
これらはそれぞれ別のgoroutineであり、シグナルを受け取ったらシャットダウンをするために、キャンセルイベントをそれぞれのgoroutineに通知する必要があります
context.Context
シャットダウンのイベントをgoroutineに通知してキャンセルする処理には、context.Context
を利用しました。
一般に、goroutine同士のメッセージングには、Goではchannel
(チャネル)が使われます。
しかし、channel
のメッセージングでは、1つのinput に対し、1つのoutputしか生まれず、一斉送信ができません。
コネクションは不定数あり、またいつ終了するかはわかりません。
1つ1つのgoroutineに終了イベントを送っている間にどれかコネクションが終了してしまうと、終了イベントのinput
がoutput
されずスレッドをブロックしてしまいます。
+--------+
| main() |
+---+----+
|
| +------------+ +------------+ +------------+
| | goroutine1 | | goroutine2 | | goroutine3 |
| +------+-----+ +------+-----+ +-------+----+
| | | |
| | | |
| | | |
| | | |
+------------> + | |
| chDone <- true | |
| | |
| | |
+------------------------------> + |
| chDone <- true |
| finished
|
+-----------------------------------------------> block!!!!!
chDone <- true
channel
のclose
を使うと一斉送信が実現できます。
しかし、同じchannel
を2度close
するとpanic
が発生するなど、生のchannel
をclose
するには細心の注意が必要です。
+--------+
| main() |
+---+----+
|
| +------------+ +------------+ +------------+
| | goroutine1 | | goroutine2 | | goroutine3 |
| +------+-----+ +------+-----+ +-------+----+
| | | |
| | | |
| | | |
| | | |
+------------> + +-------------> + +--------------> +
| close(chDone)
|
+ close(chDone)
panic!!!!!!
そこで、キャンセル処理によく使われるcontext.Context
を利用することにしました。
context.Context
は、channel
のclose
をWrapして、複数回キャンセルメソッドが呼ばれても、panic
を起こさないように実装されています。内部実装を見るとキャンセルメソッドを呼ぶ前に、ロックを取得した上でキャンセル済みかをチェックしているようです。
この実装が標準ライブラリで提供されているのは便利です。
+--------+
| main() |
+---+----+
|
| +------------+ +------------+ +------------+
| | goroutine1 | | goroutine2 | | goroutine3 |
| +------+-----+ +------+-----+ +-------+----+
| | | |
| | | |
| | | |
| | | |
+---------->ctx.Done()------->ctx.Done()-------->ctx.Done()
| cancelFunc()
|
| cancelFunc()
|
sync.WaitGroup
新しいクライアントが接続するたびに、handleConnection
のgoroutineは生成され実行されます。
キャンセルイベントが通知された後に、全てのhandleConnection
のgoroutineが終了したことを確認する必要があります。
それは、sync.WaitGroup
を使ってハンドリングします。
詳しくは、sync.WaitGroupの正しい使い方を参考にしてください。
実装
これらは、リポジトリのtcp2
ディレクトリにあります。
package main
import (
"context"
"log"
"net"
"os"
"os/signal"
"strings"
"sync"
"syscall"
)
const (
listenerCloseMatcher = "use of closed network connection"
)
func handleConnection(conn *net.TCPConn, serverCtx context.Context, wg *sync.WaitGroup) {
defer func() {
conn.Close()
wg.Done()
}()
readCtx, errRead := context.WithCancel(context.Background())
go handleRead(conn, errRead)
select {
case <-readCtx.Done():
case <-serverCtx.Done():
}
}
func handleRead(conn *net.TCPConn, errRead context.CancelFunc) {
defer errRead()
buf := make([]byte, 4*1024)
for {
n, err := conn.Read(buf)
if err != nil {
if ne, ok := err.(net.Error); ok {
switch {
case ne.Temporary():
continue
}
}
log.Println("Read", err)
return
}
n, err = conn.Write(buf[:n])
if err != nil {
log.Println("Write", err)
return
}
}
}
func handleListener(l *net.TCPListener, serverCtx context.Context, wg *sync.WaitGroup, chClosed chan struct{}) {
defer func() {
l.Close()
close(chClosed)
}()
for {
conn, err := l.AcceptTCP()
if err != nil {
if ne, ok := err.(net.Error); ok {
if ne.Temporary() {
log.Println("AcceptTCP", err)
continue
}
}
if listenerCloseError(err) {
select {
case <-serverCtx.Done():
return
default:
// fallthrough
}
}
log.Println("AcceptTCP", err)
return
}
wg.Add(1)
go handleConnection(conn, serverCtx, wg)
}
}
func listenerCloseError(err error) bool {
return strings.Contains(err.Error(), listenerCloseMatcher)
}
func main() {
tcpAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:12345")
if err != nil {
log.Println("ResolveTCPAddr", err)
return
}
l, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
log.Println("ListenTCP", err)
return
}
sigChan := make(chan os.Signal, 1)
// Ignore all signals
signal.Ignore()
signal.Notify(sigChan, syscall.SIGINT)
var wg sync.WaitGroup
chClosed := make(chan struct{})
serverCtx, shutdown := context.WithCancel(context.Background())
go handleListener(l, serverCtx, &wg, chClosed)
log.Println("Server Started")
s := <-sigChan
switch s {
case syscall.SIGINT:
log.Println("Server Shutdown...")
shutdown()
l.Close()
wg.Wait()
<-chClosed
log.Println("Server Shutdown Completed")
default:
panic("unexpected signal has been received")
}
}
解説
handleConnection と handleRead
main()
では、シグナル処理を監視するために、handleListener
をgoroutine化しました。
同様に、handleConnection
でも、シグナル処理から渡ってくるキャンセルイベントを監視する処理と、Read
とWrite
を行う処理を同時に行う必要があり、Read
とWrite
を行う処理をhandleRead
に切り出して、goroutine化しました。
handleConnection
発でコネクションを終了
SIGINT
シグナルを受け取った時の各コネクションの終了処理は以下の通りです。
+------------------+ +------------+
| handleConnection | | handleRead |
+--------+---------+ +------+-----+
| |
serverCtx.Done() |
| |
conn.Close()+------> Error on conn.Read(buf)
serverCtx.Done()
でキャンセルイベントが通知されると、handleConnection
でconn.Close()
が呼ばれます。
handleRead
内のRead()
メソッドで、エラーが発生しhandleRead
が終了します。
これにより、handleConnection
とhandleRead
の両方のgoroutineが終了し、コネクションの処理のキャンセルが完了します。
handleRead
発でコネクションを終了
一方で、クライアント側から一方的に接続を切断された場合など、handleRead
内のRead()
でエラーが発生した場合、handleConnection
のgoroutineも終了させないと、goroutineが永久に終了しないためリークしてしまいます。
handleRead
内でエラーが発生した場合は、handleConnection
にキャンセルイベントを通知する必要があります。
handleRead
からhandleConnection
へのキャンセルイベントの通知には、readCtx
を利用します。
+------------------+ +------------+
| handleConnection | | handleRead |
+--------+---------+ +------+-----+
| |
| |
| Error
| |
| |
readCtx.Done()<---------+ errRead()
|
|
conn.Close()
handleRead
内のRead()
でエラーが発生すると、errRead()
が呼ばれます。
handleConnection
内のreadCtx.Done()
で、キャンセルイベントを受け取り、handleConnection
goroutineが終了します。
AcceptTCP で発生したエラー
TCPListener
にエラーが発生した場合、handleListener
が終了しますが、その終了イベントがどこにも通知されません。
その部分のキャンセル処理については次回実装します。
Accept は Deadlineを設けない
main()
で検出されたシグナルは、context.Context
を通じてhandleListener
のgoroutineに通知されます。
AcceptTCP()
は処理をブロックしているので、シグナルが送られたかどうかをチェックするためには、AcceptTCP()
を終了してserverCtx.Done()
を確認する必要があります。
そのために、SetDeadline()
でタイムアウトを設定して、定期的にAcceptTCP()
を終了させ、serverCtx.Done()
を確認する方法があります。
しかし、この方法では定期的に検査の処理を進め、無駄にforループを回すことになりますし、定期的にAccept
システムコールが呼ばれるためコンテキストスイッチが大量に発生することになりパフォーマンスを劣化させます。
この問題に対しては、https://github.com/nsqio/nsq のアプローチを採用しました。
- https://github.com/nsqio/nsq/blob/f874512d88461342d236c16caccaba62aa8da3e5/internal/protocol/tcp_server.go#L26
- https://github.com/nsqio/nsq/blob/f874512d88461342d236c16caccaba62aa8da3e5/nsqd/nsqd.go#L459
を見ると、SetDeadline()
は使わず、外部からListener
をClose
することでAcceptTCP()
を終了させます。
Listener
がClose
することは、正常な処理であるため、その後でClose
のエラーを無視する方法を取っています。
Close
のエラーは、エラーメッセージの文字列の一致によって判別しています。
シャットダウンはどのように動くか
シャットダウンは以下のように動きます。
+--------+ +----------------+ +------------------+ +------------+
| main() | | handleListener | | handleConnection | | handleRead |
+---+----+ +--------+-------+ +--------+---------+ +------+-----+
| | | |
| | | |
SIGINT +-----> s := <+sigChan | | |
| | | |
shutdown()+-------------------------------------> serverCtx.Done() |
| | | |
| | conn.Close()+------> Error on conn.Read(buf)
| | |
l.Close()+-----> Error on l.AcceptTCP() |
| | |
wg.Wait() <----------------------------------------+ wg.Done()
| |
<-chClosed <------+ close(chClosed)
動かしてみる
以下のコマンドでechoサーバーが立ち上がります。
go run main.go
ターミナルを複数画面開いて、
telnet localhost 12345
とすると、telnetで接続できます。何か文字列を打ち込んで送信し(Enter)同じ文字列がレスポンスされることを確認しましょう。
サーバー側で、Ctrl + C
を押すと、SIGINT
が送信されます。
^C2017/07/18 18:32:42 Server Shutdown...
2017/07/18 18:32:42 Server Shutdown Completed
というログが表示され、シャットダウンが正常に行われたことを確認してください。
最後に
シグナル処理の実装を追加しました。
次回は、
- Listener内でのエラーのハンドリング
-
Server
とConn
のstruct
化
を行なっていきます。