6月からDMM.comラボの六本木オフィスでミドルウェアを作るエンジニアインターンをしている@kawasin73です。
DMM.comラボではluaで実装されたKVS(キーバリューストア)を利用しています。
これは、TCPの上で独自プロトコルで通信しており、URIのPathがKeyとなり最長共通接頭辞検索をするKVSで、社内でluaの皮を被ったC言語で実装されたものが運用されています。
この度、このKVSをGo言語で再実装することになり、設計は既存のミドルウェアを踏襲した形で DMM.com ラボの方が行い、実装は僕がすることになりました。
Go言語の実装手法(goroutine や channel等)については僕が学びながらそれについて都度相談するというスタイルで行なっています。
その開発記を連載しています。
第1回 GoでTCPサーバー上で独自プロトコルKey-Value Storeを実装する(知識編)
第2回 GolangでハイパフォーマンスなTCPサーバーを実装する(下準備編)
第3回 Go言語でTCPのEchoサーバーを実直に実装する
第4回 Go言語でシグナルハンドリングをするTCPのEchoサーバーを実装する
第5回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その1)
第6回 Go言語でGraceful Shutdown可能なTCPのechoサーバーを実装する(その2)
第7回 GolangでTCPサーバーに再起動とGraceful Shutdownを実装する
第6回の今回は、Writeの処理を非同期化します。
はじめに
開発環境は以下の通りです。
- OS: macOS 10.12.5
- Go version: 1.8.3
- IDE: Gogland
今回開発した内容は、こちらのリポジトリのtcp4
ディレクトリで公開しています。
Writeを非同期化する
前回までの実装では、for
文の中でRead
してその後にWrite
する直列的な処理を行っていました。
Write
している間は、Read
できませんし、Read
している間はWrite
できません。
そこで、Write
を別のgoroutineで行うことでWrite
とRead
を並行して実行できるようにします。
Write
を非同期化するには2種類の方法があります。
- 別goroutineで
for
文でWrite
を繰り返し行い、handleRead
からバッファ付きchannelで読み込んだバイト列を渡して書き込む方法
+------------+ +-------------+
| handleRead | | handleWrite |
+-----+------+ +------+------+
| |
+-----> |
| | |
| Read <----+
Loop | | channel | |
| +------------------> |
| | | | Loop
+-----+ Write |
| |
+----+
-
handleRead
からRead
する度に、goroutineを起動しWrite
する方法
+------------+
| handleRead |
+-----+------+
|
|
|
Read
|
| goroutine +------------+
+---------------> handleEcho |
| +-----+------+
| |
Read Write
|
| goroutine +------------+
+---------------> handleEcho |
| +-----+------+
| |
| Write
|
今回は後者の「handleRead
からRead
する度に、goroutineを起動しWrite
する方法」を採用します。
理由としては以下の通りになります。
今回は最終的にデータベースを開発する第一歩としてechoサーバーを作っています。
そのため将来的には、Write
をする前にはデータベースに関する処理を行うことになります。
データベースの処理はRead
やWrite
より遅くなることが予想されます。
データベースの処理を多重化して高速化することを目論んでいるので、echoサーバーの実装のうちからWrite
するgoroutineを多重化します。
net.TCPConn
のWrite
の内部実装を見ると、func (fd *netFD) Write(p []byte)
の中でロックを取得してからWrite
しています。
そのため、複数のgoroutineから同時にWrite
されても競合することなくWrite
することができます。
ただ後者はデメリットもあります。
channel
を使うとRead
された順番の通りにWrite
されますが、複数のgoroutineを起動する方法だとgoroutine
の実行順序を保証することができません。
echoサーバーとしてはWrite
されるものの順序が変わる可能性があるため失格です。
しかし、この後開発するデータベースのプロトコルでは、リクエストの順序とレスポンスの順序が一致しないことを前提として、RID
(リクエストID)をリクエストとレスポンスのバイナリに含めることになっています。
そのため、今回はレスポンスの順序が変わる可能性があることを許容して実装します。
実装
func (c *Conn) handleRead() {
defer c.stopRead()
buf := make([]byte, 4*1024)
for {
n, err := c.conn.Read(buf)
if err != nil {
if ne, ok := err.(net.Error); ok {
switch {
case ne.Temporary():
continue
}
}
log.Println("Read", err)
return
}
go c.handleEcho(buf[:n]) // <- 非同期化
}
}
func (c *Conn) handleEcho(buf []byte) {
// do something
// 将来的にはここでデータベースに関する処理を行う
// write
for {
n, err := c.conn.Write(buf)
if err != nil {
if nerr, ok := err.(net.Error); ok {
if nerr.Temporary() {
buf = buf[n:]
continue
}
}
log.Println("Write error", err)
// write error
c.stopRead()
}
return
}
}
前回の実装で、errRead
としていたキャンセルメソッドは、わかりやすくするためにstopRead
と名前を変えました。
Slice の内部に思いを馳せる
ここまでの状態で(コミットID: 33dcf05b194348b38c6ee668ece08c6dadb7be72)、一度動作確認をしてみます。
0 ~ 255 まで、1バイトずつ高速に送信します。(コードは、tcp4/cmd/sample/main.go
)
func main() {
c, err := net.Dial("tcp", "127.0.0.1:12345")
if err != nil {
log.Fatal(err)
}
defer c.Close()
go write(c)
buf := make([]byte, 100)
for i := 0; i < 256; i++ {
var n int
n, err = c.Read(buf)
fmt.Println(buf[:n])
}
}
func write(c net.Conn) {
for i := 0; i < 256; i++ {
c.Write([]byte{byte(i)})
}
}
結果は、この通りです。
[0 1 2 3 4 5 6 7 8 9 10 11 12 13]
[15 15] // <- 重複
[16 17 18 19 20 21 22 23]
[25] // <- 重複
[25] // <- 重複
[26]
~~~ 省略 ~~~
[255 251]
[255] // <- 重複
[255] // <- 重複
[255] // <- 重複
レスポンスが重複しているところが多くあります。なぜでしょうか?
ここで、Slice の内部実装に思いを馳せてみます。
これらの記事を読むと分かる通り、あるSliceから作成されたSliceは、内部では同じ配列のポインタを保持しています。
Write
とRead
が並行に実行され、Write
する前にbuf
が上書きされているために、レスポンスが正しく送られなかったのだとわかります。
ここまでを踏まえると
func (c *Conn) handleRead() {
defer c.stopRead()
buf := make([]byte, 4*1024)
for {
n, err := c.conn.Read(buf)
if err != nil {
if ne, ok := err.(net.Error); ok {
switch {
case ne.Temporary():
continue
}
}
log.Println("Read", err)
return
}
wBuf := make([]byte, n) // <- 別のSliceを作って
copy(wBuf, buf[:n]) // <- コピー
go c.handleEcho(wBuf)
}
}
と、handleEcho
に渡すSliceは書き換えられることがないように都度作成してコピーするようにします。
このように実装を修正して、もう一度tcp4/cmd/sample/main.go
を実行すると、順番は一部入れ替わっているものの、0 ~ 255の全てのレスポンスが返ってきていることが確認できます。
Shutdown時にWriteを止める
Shutdown時にhandleEcho
のgoroutineが複数実行中であった時、c.conn.Close()
が呼ばれた後にWrite
を複数回実行することになり、use of closed network connection
が余計に発生することになります。
余計なエラーのログを避けるため、Close
した後は、Write
しないように実装します。
// func newConn
ctxWrite, stopWrite := context.WithCancel(context.Background())
func (c *Conn) handleConnection() {
defer func() {
c.stopWrite() // <- 追加
c.conn.Close()
c.svr.Wg.Done()
}()
go c.handleRead()
select {
case <-c.ctxRead.Done():
case <-c.svr.ctx.Done():
case <-c.svr.AcceptCtx.Done():
}
}
func (c *Conn) handleEcho(buf []byte) {
// do something
// write
for {
select {
case <-c.ctxWrite.Done(): // <- 追加
return
default:
n, err := c.conn.Write(buf)
if err != nil {
if nerr, ok := err.(net.Error); ok {
if nerr.Temporary() {
buf = buf[n:]
continue
}
}
log.Println("Write error", err)
// write error
c.stopRead()
}
return
}
}
}
しかし、この実装のselect
のdefault
では不十分です。複数のhandleEcho
が実行された時、複数のgoroutineでc.conn.Write(buf)
の内部のロックで処理が止まり、c.ctxWrite.Done()
が有効になった後もc.conn.Write(buf)
の中で待ち続けてしまいます。
複数のロックを待ち受けるには、select
case
を使ってchannel
を待つ必要があります。
そのためには、c.conn.Write(buf)
をチャネルを使ってロックをかける必要があります。
それには、GoConで発表してきたのでついでにruntime以下の知識をまとめていく #golang 内のスライドの27ページ目で紹介されている Semaphore的なchannelの使い方を用います。
// func newConn
sem := make(chan struct{}, 1)
func (c *Conn) handleEcho(buf []byte) {
// do something
// write
select {
case <-c.ctxWrite.Done():
return
case c.sem <- struct{}{}:
defer func() { <-c.sem }()
for {
n, err := c.conn.Write(buf)
if err != nil {
if nerr, ok := err.(net.Error); ok {
if nerr.Temporary() {
buf = buf[n:]
continue
}
}
log.Println("Write error", err)
// write error
c.stopRead()
}
return
}
}
}
バッファーサイズが1のchannelを使って、必ずc.conn.Write(buf)
を実行しているgoroutineが一つであることを保証しています。
最後に
ここまででWrite
を非同期化しました。
次回は、Graceful ShutdownとRestartのシグナルハンドリングを実装します。