6月からDMM.comラボの六本木オフィスでミドルウェアを作るエンジニアインターンをしている@kawasin73です。
DMM.comラボではluaで実装されたKVS(キーバリューストア)を利用しています。
これは、TCPの上で独自プロトコルで通信しており、URIのPathがKeyとなり最長共通接頭辞検索をするKVSで、社内でluaの皮を被ったC言語で実装されたものが運用されています。
この度、このKVSをGo言語で再実装することになり、設計は既存のミドルウェアを踏襲した形で DMM.com ラボの方が行い、実装は僕がすることになりました。
Go言語の実装手法(goroutine や channel等)については僕が学びながらそれについて都度相談するというスタイルで行なっています。
その開発記を連載しています。
第1回 GoでTCPサーバー上で独自プロトコルKey-Value Storeを実装する(知識編)
第2回 GolangでハイパフォーマンスなTCPサーバーを実装する(下準備編)
第3回 Go言語でTCPのEchoサーバーを実直に実装する
第4回 Go言語でシグナルハンドリングをするTCPのEchoサーバーを実装する
第5回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その1)
第6回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その2)
第7回 GolangでTCPサーバーに再起動とGraceful Shutdownを実装する
前回までで、シグナルハンドリングを備えたechoサーバーの実装を行いました。
第5回の今回は、すべて関数で実装されているListenerとConnectionの処理をstruct
を使ってまとめ上げていきます。
はじめに
開発環境は以下の通りです。
- OS: macOS 10.12.5
- Go version: 1.8.3
- IDE: Gogland
今回開発した内容は、こちらのリポジトリのtcp3
ディレクトリで公開しています。
前回やり残したこと
前回は、SIGINT
のシグナルハンドリングを実装しました。
その中ではAcceptTCP()
のエラーハンドリングが途中で終わっていました。
今回はそこから再開していきたいと思います。
AcceptTCP()
でエラーが発生したときはhandleListener
のgoroutineが終了します。
そして、そのエラーイベントをmain()
goroutineと、handleConnection
goroutineと、handleRead
goroutineに通知して、それぞれのgoroutineを終了させなければなりません。
+--------+ +----------------+ +------------------+ +------------+
| main() | | handleListener | | handleConnection | | handleRead |
+---+----+ +--------+-------+ +--------+---------+ +------+-----+
| | | |
| Error on AcceptTCP() | |
| | | |
| notify cancel | notify cancel | |
| <------------------+----------------------------> | |
| | notify cancel |
Finish +-----------------> |
| |
Finish Finish
handleConnection
からhandleRead
へのキャンセルを通知する仕組みはすでにあるため、handleListener
から、main()
とhandleConnection
へ、キャンセルイベントを通知する仕組みを追加する必要があります。
キャンセルイベントの通知には、前回と同様にcontext.Context
を用います。
見えてきた限界・・・。structでまとめる
main()
と handleListener
と、 handleConnection
で同じcontext.Context
にアクセスするためには、handleListener
とhandleConnection
の引数に渡して共有する必要があります。
ここで限界が見えてきます。
すでに、
func handleListener(l *net.TCPListener, serverCtx context.Context, wg *sync.WaitGroup, chClosed chan struct{})
func handleConnection(conn *net.TCPConn, serverCtx context.Context, wg *sync.WaitGroup)
と、それぞれのメソッドは多くの引数を持っており、これに新しいcontext.Context
を追加すると複雑になり分かりにくくなります。
これを解決するために、struct
を利用することにします。
handleConnection
で利用するプロパティをServer
structに、handleConnection
で利用するプロパティをConn
structにまとめて保持することにします。
Server
handleListener
をServer
のメソッドにしてしまうことで、引数をなくすことができました。
type Server struct {
addr string
listener *net.TCPListener
ctx context.Context
shutdown context.CancelFunc
Wg sync.WaitGroup
ChClosed chan struct{}
}
func (s *Server) handleListener() {
defer func() {
s.listener.Close()
close(s.ChClosed)
}()
for {
conn, err := s.listener.AcceptTCP()
if err != nil {
// 省略 ...
}
wg.Add(1)
go handleConnection(conn, s.ctx, &s.Wg)
}
}
前回(tcp2)は、ListenTCP
をmain()
の中で行なっていますが、Listener
の取り扱いは本来はServer
の責務のはずです。
Listen()
メソッドをServer
に追加して、サーバーの起動処理をまとめます。
また、シャットダウン時の処理もListener
を触っているため、ShutDown()
としてServer
のメソッドにしてしまいます。
func (s *Server) Shutdown() {
select {
case <-s.ctx.Done():
// already shutdown
default:
s.shutdown()
s.listener.Close()
}
}
func (s *Server) Listen() error {
tcpAddr, err := net.ResolveTCPAddr("tcp", s.addr)
if err != nil {
return err
}
l, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return err
}
s.listener = l
go s.handleListener()
return nil
}
Conn
handleConnection
とhandleRead
をConn
のメソッドにしました。これにより、handleConnection
とhandleRead
で共有するerrRead
を引数で渡さなくてもよくなります。
Conn
は内部にServer
のポインタを持つようにしました。wg
やserverCtx
にConn
からアクセスすることができます。
type Conn struct {
svr *Server
conn *net.TCPConn
readCtx context.Context
errRead context.CancelFunc
}
func (c *Conn) handleConnection() {
defer func() {
c.conn.Close()
c.svr.Wg.Done()
}()
go c.handleRead()
select {
case <-c.readCtx.Done():
case <-c.svr.ctx.Done():
}
}
役割の境界が見えてくる。 serverパッケージを作る
ここまで作ってきて、ぼんやりと境界が見えてきました。
-
TCPListener
や、TCPConn
などのnet
パッケージにあるものを操作するサーバーの役割 - シグナルハンドリングやサーバーの起動など、前者を使ってハンドリングする役割
前者をserver
パッケージ、後者を引き続きmain
パッケージとして分離します。
$ tree
.
├── main.go
└── server
├── conn.go
└── server.go
1 directory, 3 files
Go言語におけるパッケージの分離は、フォルダを分けることで実現します。
サーバーのエラーによるキャンセルイベントを伝播する
Conn
とServer
に分離したことで、新しいフィールドの追加が気軽に行えるようになりました。
ここで、本来やりたかったAcceptTCP()
のエラーを全てのgoroutineに伝える処理を追加します。
-
Server
struct にcontext.Context
を追加
type Server struct {
AcceptCtx context.Context
errAccept context.CancelFunc
}
-
handleListener
で、エラーの発生を通知する
func (s *Server) handleListener() {
defer func() {
s.listener.Close()
close(s.ChClosed)
}()
for {
conn, err := s.listener.AcceptTCP()
if err != nil {
if ne, ok := err.(net.Error); ok {
if ne.Temporary() {
log.Println("AcceptTCP", err)
continue
}
}
if listenerCloseError(err) {
select {
case <-s.ctx.Done():
return
default:
// fallthrough
}
}
log.Println("AcceptTCP", err)
s.errAccept() // <- 追加
return
}
c := newConn(s, conn)
s.Wg.Add(1)
go c.handleConnection()
}
}
-
handleConnection
で、Listener
のエラーをキャッチ
func (c *Conn) handleConnection() {
defer func() {
c.conn.Close()
c.svr.Wg.Done()
}()
go c.handleRead()
select {
case <-c.readCtx.Done():
case <-c.svr.ctx.Done():
case <-c.svr.AcceptCtx.Done(): // <- 追加
}
}
-
main()
でListener
のエラーをキャッチ
select {
case sig := <-sigChan:
switch sig {
case syscall.SIGINT:
log.Println("Server Shutdown...")
svr.Shutdown()
svr.Wg.Wait()
<-svr.ChClosed
log.Println("Server Shutdown Completed")
default:
panic("unexpected signal has been received")
}
case <-svr.AcceptCtx.Done(): // <- 追加
log.Println("Server Error Occurred")
// wait until all connection closed
svr.Wg.Wait()
<-svr.ChClosed
log.Println("Server Shutdown Completed")
}
main()
では、シグナルのイベントとサーバーエラーのイベントを同時に待ち受けます。これには、select
とcase
を使います。
最後に
パッケージの分離と、Server
とConn
のstruct化、サーバーエラーによるシャットダウンを行いました。
次回は、Writeの非同期化に取り掛かります。