6月からDMM.comラボの六本木オフィスでミドルウェアを作るエンジニアインターンをしている@kawasin73です。
DMM.comラボではluaで実装されたKVS(キーバリューストア)を利用しています。
これは、TCPの上で独自プロトコルで通信しており、URIのPathがKeyとなり最長共通接頭辞検索をするKVSで、社内でluaの皮を被ったC言語で実装されたものが運用されています。
この度、このKVSをGo言語で再実装することになり、設計は既存のミドルウェアを踏襲した形で DMM.com ラボの方が行い、実装は僕がすることになりました。
Go言語の実装手法(goroutine や channel等)については僕が学びながらそれについて都度相談するというスタイルで行なっています。
その開発記を連載しています。
第1回 GoでTCPサーバー上で独自プロトコルKey-Value Storeを実装する(知識編)
第2回 GolangでハイパフォーマンスなTCPサーバーを実装する(下準備編)
第3回 Go言語でTCPのEchoサーバーを実直に実装する
第4回 Go言語でシグナルハンドリングをするTCPのEchoサーバーを実装する
第5回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その1)
第6回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その2)
第7回 GolangでTCPサーバーに再起動とGraceful Shutdownを実装する
前回は、TCPのEchoサーバーを実直に実装するところまで進めました。
第4回の今回は、Graceful Shutdownを実現する第1歩として、シグナルハンドリングを組み込んでいきます。
はじめに
開発環境は以下の通りです。
- OS: macOS 10.12.5
- Go version: 1.8.3
- IDE: Gogland
今回開発した内容は、こちらのgithubリポジトリで公開しています。
シグナルのハンドリング
プロセスはシグナルを割り込みで受け付けることによって、処理の中断やシャットダウンを行います。
今回は、最終ゴールとして、このようなハンドリングを行うことにしました。
| Signal | 処理 |
|---|---|
| SIGINT | Shutdown |
| SIGTERM | Shutdown |
| SIGQUIT | Graceful Shutdown |
| SIGHUP | Restart |
| その他 | 無視 |
また、SIGKILL と SIGSTOP は、言語レベルではハンドリングすることはできません。
そのうち、今回はSIGINTでシャットダウンする処理を実装し、それ以外のシグナルについては無視するようにします。
Goでは、シグナルは以下のようにしてチャネルで受け取ります。
sigChan := make(chan os.Signal, 1)
// Ignore all signals
signal.Ignore()
signal.Notify(sigChan, syscall.SIGINT)
シグナルを受け取ると、sigChanに受け取ったシグナルが通知されます。
キャンセル処理
<-sigChan
上記のコードで、シグナルを受け取ったかどうかを検出することができますが、シグナルを受け取るまでスレッドをブロックします。
そのため、シグナルを監視する処理は、handleListener()を実行しているgoroutineとは別のgoroutineとして実行する必要があります。
前回は、main()を実行するスレッドで、handleListenerを実行していました。
今回は、main()を実行するスレッドではシグナルを監視し、handleListenerは別のgoroutineとして実行することにしました。
- シグナルのイベントを監視するgoroutine (
main()) - Listenerで
Acceptしているgoroutine (handleListener) - コネクションの処理を行っているgoroutine (
handleConnection)
これらはそれぞれ別のgoroutineであり、シグナルを受け取ったらシャットダウンをするために、キャンセルイベントをそれぞれのgoroutineに通知する必要があります
context.Context
シャットダウンのイベントをgoroutineに通知してキャンセルする処理には、context.Contextを利用しました。
一般に、goroutine同士のメッセージングには、Goではchannel(チャネル)が使われます。
しかし、channelのメッセージングでは、1つのinput に対し、1つのoutputしか生まれず、一斉送信ができません。
コネクションは不定数あり、またいつ終了するかはわかりません。
1つ1つのgoroutineに終了イベントを送っている間にどれかコネクションが終了してしまうと、終了イベントのinputがoutputされずスレッドをブロックしてしまいます。
+--------+
| main() |
+---+----+
|
| +------------+ +------------+ +------------+
| | goroutine1 | | goroutine2 | | goroutine3 |
| +------+-----+ +------+-----+ +-------+----+
| | | |
| | | |
| | | |
| | | |
+------------> + | |
| chDone <- true | |
| | |
| | |
+------------------------------> + |
| chDone <- true |
| finished
|
+-----------------------------------------------> block!!!!!
chDone <- true
channelのcloseを使うと一斉送信が実現できます。
しかし、同じchannelを2度closeするとpanicが発生するなど、生のchannelをcloseするには細心の注意が必要です。
+--------+
| main() |
+---+----+
|
| +------------+ +------------+ +------------+
| | goroutine1 | | goroutine2 | | goroutine3 |
| +------+-----+ +------+-----+ +-------+----+
| | | |
| | | |
| | | |
| | | |
+------------> + +-------------> + +--------------> +
| close(chDone)
|
+ close(chDone)
panic!!!!!!
そこで、キャンセル処理によく使われるcontext.Contextを利用することにしました。
context.Contextは、channelのcloseをWrapして、複数回キャンセルメソッドが呼ばれても、panicを起こさないように実装されています。内部実装を見るとキャンセルメソッドを呼ぶ前に、ロックを取得した上でキャンセル済みかをチェックしているようです。
この実装が標準ライブラリで提供されているのは便利です。
+--------+
| main() |
+---+----+
|
| +------------+ +------------+ +------------+
| | goroutine1 | | goroutine2 | | goroutine3 |
| +------+-----+ +------+-----+ +-------+----+
| | | |
| | | |
| | | |
| | | |
+---------->ctx.Done()------->ctx.Done()-------->ctx.Done()
| cancelFunc()
|
| cancelFunc()
|
sync.WaitGroup
新しいクライアントが接続するたびに、handleConnectionのgoroutineは生成され実行されます。
キャンセルイベントが通知された後に、全てのhandleConnectionのgoroutineが終了したことを確認する必要があります。
それは、sync.WaitGroupを使ってハンドリングします。
詳しくは、sync.WaitGroupの正しい使い方を参考にしてください。
実装
これらは、リポジトリのtcp2ディレクトリにあります。
package main
import (
"context"
"log"
"net"
"os"
"os/signal"
"strings"
"sync"
"syscall"
)
const (
listenerCloseMatcher = "use of closed network connection"
)
func handleConnection(conn *net.TCPConn, serverCtx context.Context, wg *sync.WaitGroup) {
defer func() {
conn.Close()
wg.Done()
}()
readCtx, errRead := context.WithCancel(context.Background())
go handleRead(conn, errRead)
select {
case <-readCtx.Done():
case <-serverCtx.Done():
}
}
func handleRead(conn *net.TCPConn, errRead context.CancelFunc) {
defer errRead()
buf := make([]byte, 4*1024)
for {
n, err := conn.Read(buf)
if err != nil {
if ne, ok := err.(net.Error); ok {
switch {
case ne.Temporary():
continue
}
}
log.Println("Read", err)
return
}
n, err = conn.Write(buf[:n])
if err != nil {
log.Println("Write", err)
return
}
}
}
func handleListener(l *net.TCPListener, serverCtx context.Context, wg *sync.WaitGroup, chClosed chan struct{}) {
defer func() {
l.Close()
close(chClosed)
}()
for {
conn, err := l.AcceptTCP()
if err != nil {
if ne, ok := err.(net.Error); ok {
if ne.Temporary() {
log.Println("AcceptTCP", err)
continue
}
}
if listenerCloseError(err) {
select {
case <-serverCtx.Done():
return
default:
// fallthrough
}
}
log.Println("AcceptTCP", err)
return
}
wg.Add(1)
go handleConnection(conn, serverCtx, wg)
}
}
func listenerCloseError(err error) bool {
return strings.Contains(err.Error(), listenerCloseMatcher)
}
func main() {
tcpAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:12345")
if err != nil {
log.Println("ResolveTCPAddr", err)
return
}
l, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
log.Println("ListenTCP", err)
return
}
sigChan := make(chan os.Signal, 1)
// Ignore all signals
signal.Ignore()
signal.Notify(sigChan, syscall.SIGINT)
var wg sync.WaitGroup
chClosed := make(chan struct{})
serverCtx, shutdown := context.WithCancel(context.Background())
go handleListener(l, serverCtx, &wg, chClosed)
log.Println("Server Started")
s := <-sigChan
switch s {
case syscall.SIGINT:
log.Println("Server Shutdown...")
shutdown()
l.Close()
wg.Wait()
<-chClosed
log.Println("Server Shutdown Completed")
default:
panic("unexpected signal has been received")
}
}
解説
handleConnection と handleRead
main() では、シグナル処理を監視するために、handleListenerをgoroutine化しました。
同様に、handleConnectionでも、シグナル処理から渡ってくるキャンセルイベントを監視する処理と、ReadとWriteを行う処理を同時に行う必要があり、ReadとWriteを行う処理をhandleReadに切り出して、goroutine化しました。
handleConnection発でコネクションを終了
SIGINTシグナルを受け取った時の各コネクションの終了処理は以下の通りです。
+------------------+ +------------+
| handleConnection | | handleRead |
+--------+---------+ +------+-----+
| |
serverCtx.Done() |
| |
conn.Close()+------> Error on conn.Read(buf)
serverCtx.Done()でキャンセルイベントが通知されると、handleConnectionでconn.Close()が呼ばれます。
handleRead内のRead()メソッドで、エラーが発生しhandleReadが終了します。
これにより、handleConnectionとhandleReadの両方のgoroutineが終了し、コネクションの処理のキャンセルが完了します。
handleRead発でコネクションを終了
一方で、クライアント側から一方的に接続を切断された場合など、handleRead内のRead()でエラーが発生した場合、handleConnectionのgoroutineも終了させないと、goroutineが永久に終了しないためリークしてしまいます。
handleRead内でエラーが発生した場合は、handleConnectionにキャンセルイベントを通知する必要があります。
handleReadからhandleConnectionへのキャンセルイベントの通知には、readCtxを利用します。
+------------------+ +------------+
| handleConnection | | handleRead |
+--------+---------+ +------+-----+
| |
| |
| Error
| |
| |
readCtx.Done()<---------+ errRead()
|
|
conn.Close()
handleRead内のRead()でエラーが発生すると、errRead()が呼ばれます。
handleConnection内のreadCtx.Done()で、キャンセルイベントを受け取り、handleConnectiongoroutineが終了します。
AcceptTCP で発生したエラー
TCPListenerにエラーが発生した場合、handleListenerが終了しますが、その終了イベントがどこにも通知されません。
その部分のキャンセル処理については次回実装します。
Accept は Deadlineを設けない
main()で検出されたシグナルは、context.Contextを通じてhandleListenerのgoroutineに通知されます。
AcceptTCP()は処理をブロックしているので、シグナルが送られたかどうかをチェックするためには、AcceptTCP()を終了してserverCtx.Done()を確認する必要があります。
そのために、SetDeadline()でタイムアウトを設定して、定期的にAcceptTCP()を終了させ、serverCtx.Done()を確認する方法があります。
しかし、この方法では定期的に検査の処理を進め、無駄にforループを回すことになりますし、定期的にAcceptシステムコールが呼ばれるためコンテキストスイッチが大量に発生することになりパフォーマンスを劣化させます。
この問題に対しては、https://github.com/nsqio/nsq のアプローチを採用しました。
- https://github.com/nsqio/nsq/blob/f874512d88461342d236c16caccaba62aa8da3e5/internal/protocol/tcp_server.go#L26
- https://github.com/nsqio/nsq/blob/f874512d88461342d236c16caccaba62aa8da3e5/nsqd/nsqd.go#L459
を見ると、SetDeadline()は使わず、外部からListenerをCloseすることでAcceptTCP()を終了させます。
ListenerがCloseすることは、正常な処理であるため、その後でCloseのエラーを無視する方法を取っています。
Closeのエラーは、エラーメッセージの文字列の一致によって判別しています。
シャットダウンはどのように動くか
シャットダウンは以下のように動きます。
+--------+ +----------------+ +------------------+ +------------+
| main() | | handleListener | | handleConnection | | handleRead |
+---+----+ +--------+-------+ +--------+---------+ +------+-----+
| | | |
| | | |
SIGINT +-----> s := <+sigChan | | |
| | | |
shutdown()+-------------------------------------> serverCtx.Done() |
| | | |
| | conn.Close()+------> Error on conn.Read(buf)
| | |
l.Close()+-----> Error on l.AcceptTCP() |
| | |
wg.Wait() <----------------------------------------+ wg.Done()
| |
<-chClosed <------+ close(chClosed)
動かしてみる
以下のコマンドでechoサーバーが立ち上がります。
go run main.go
ターミナルを複数画面開いて、
telnet localhost 12345
とすると、telnetで接続できます。何か文字列を打ち込んで送信し(Enter)同じ文字列がレスポンスされることを確認しましょう。
サーバー側で、Ctrl + Cを押すと、SIGINTが送信されます。
^C2017/07/18 18:32:42 Server Shutdown...
2017/07/18 18:32:42 Server Shutdown Completed
というログが表示され、シャットダウンが正常に行われたことを確認してください。
最後に
シグナル処理の実装を追加しました。
次回は、
- Listener内でのエラーのハンドリング
-
ServerとConnのstruct化
を行なっていきます。