忘備録的に。推敲が足りないので読みづらいので心の広い人以外は読まないように。
goでは、Socket扱ったりいろんな通信をする複数のgoroutineが生えてるアクター的なStructを作ることが多いので、綺麗に全てのgoroutineを終了して何らかの終了処理を実施するための基本構成をまとめる。
まず停止処理に関係なくdeadlockを避けるために気をつけること
- socketでも待つしchannelでも待つしsync.Condとかでも待つというような複数のwaitが行われるようなgoroutineはdeadlockする可能性が高いのでそもそも作るべきじゃない。
- threadsafeじゃないオブジェクトを触るのは基本的に一つのgoroutineに収める
- channelにオブジェクトの操作を完全に任せられない場合は、Mutexを使うことになるが、Mutexでロックしている範囲を不用意に広くしたり、ロックの中で別のロックを取得するような事はしない。
- 排他制御で守らなければいけないリソースは、getter/setterを作って、ロックを書ける範囲を最小にしておくと良い
- ロックの中でcollectionのiterationなんかをしてると、大体気づかない内に別のロックを獲得していることがままあるので、そういうことはしない。
- lockとunlockで囲われた部分を関数に切り出さずにnarrativeに書いていると、コード上でどこでなにのロックをとっているのかが分かりにくくなって、予期せぬdeadlockを引き起こす。
目的
- どこかのgoroutineでエラーなり終了の通知が現れた場合に全てのgoroutineを停止し、全てのgoroutine停止後にfinalize処理を実施したい
- 外部から明示的に終了処理が呼ばれた場合に、全てのgoroutineを停止して、全てのgoroutine停止後にfinalize処理を実施したい
このような通信をするオブジェクトは、erlangのsupervisorのような物がないので、大体親オブジェクトがライフサイクルを管理する必要がある。その場合には、finalize処理としては、親のオブジェクトに俺ちゃんと死んだよ
と通知して自分を削除してもらうような処理を書くことになる。
設計の前提
- 基本的に全てのgoroutineは、selectで複数のchannelをlistenする以外のブロッキング処理は入れないようにする
- TCP Socketの場合はWrite時にBlockする可能性があるので、TCPConn.Writeは間にQueueやQueueを持ったGoroutineを入れて、TCPConn.Writeはlockで囲わなくて良いようにする
- 例外的にsocketからreadするようなgoroutineでは、socketのrecvでブロッキングが走る
実装方針
- 終了処理の契機は、外部からShutdownが叩かれるか、どれかのgoroutineが停止して抜けた際に自分でShutdownを呼ぶことで全て停止
- Shutdownは、複数呼ばれても一度しか実行されないようにsync.Onceで囲う
- goroutineの外からの止め方
- socketのrecvを抜けるにはnet.Connをcloseして、recvのerrを見てreturnする
- selectで、channelをlistenしている場合は、停止用のchannelにbroadcastしてそこをハンドリングしてreturnする
-
braodcastする方法は存在しないので、channelをcloseすれば良い。closeされたchannelからデータを読みだす場合は、2番目の返り値が必ずfalseを返すので、それをハンドリング
``` for{ select{ case _, ok <- closedChan: if !ok{ return } case ... } } ```
-
注意点としては、closeしたchannelにデータを詰めるとpanicするので、closeする可能性のあるchannelは基本的にcloseするためにのみ使う、ちゃんと管理できるならいいけど。
-
- どれかのgoroutineが止まった場合に全て止めるために、goroutineのdeferで、必ずShutdownを呼ぶようにする
- 全てのgoroutineが停止したことをハンドリングするためにsync.WaitGroupを用いる
- goroutineの先頭で、sync.WaitGroup.Add(1)を呼び、goroutineのdeferの中で、sync.WaitGroup.Doneを呼ぶ
- Shutdonwの中で、channelとsocketをcloseした後で、sync.WaitGroup.Waitすると、全てのgoroutineのdeferが呼ばれるまでwaitする。
- Waitを抜けた後に、finalizeを実施する
実装例
package hoge
import (
"net"
"sync"
"time"
)
type Hoge struct {
id int
conn net.Conn
shutdownChan chan struct{} // こいつはcloseするだけで絶対に書き込まない
HogeChan chan struct{}
heartbeatChan chan struct{}
shutdown bool
once sync.Once
wg sync.WaitGroup
}
func MakeHoge(id int, conn net.Conn) *Hoge {
h := Hoke{
id: id,
conn: conn,
shutdownChan: make(chan struct{}),
hogeChan: make(chan struct{}, 10),
heartbeatChan: make(chan struct{}, 10),
}
go h.readLoop()
go h.hogeLoop()
go h.hartbeat()
return &h
}
func (h *Hoge) Shutdown() {
h.once.Do(func() {
h.conn.Close()
close(h.shutdownChan)
h.wg.Wait()
do_finalize()
})
}
func (h *Hoge) readLoop() {
h.wg.Add(1)
defer func() {
h.wg.Done()
h.Shutdown()
}()
buf := make([]byte, 1024)
for {
l, err := h.conn.Read(buf)
if err != nil {
return
}
do_something(buf[:l])
}
}
func (h *Hoge) hogeLoop() {
h.wg.Add(1)
defer func() {
h.wg.Done()
h.Shutdown()
}()
for {
select {
case _ <- h.HogeChan:
do_something_hoge()
case _, ok := <-h.shutdownChan:
return
}
}
}
func (h *Hoge) hartbeat() {
h.wg.Add(1)
defer func() {
h.wg.Done()
h.Shutdown()
}()
ticker := time.NewTicker(10 * time.Second)
latest := time.Now().Unix()
for {
select {
case <-ticker.C:
if time.Now().Unix()-latest >= 10 {
// timeout
return
}
case _ <- h.heartbeatChan:
latest = time.Now().Unix()
case _, ok := <-h.shutdownChan:
return
}
}
}
もっといい方法ない?
chanをselectしている全てのgoroutineを止めるためにcloseするだけのchanを作って、chanのcloseを終了条件にしているけど、もっと良い方法ないだろうか。sync.CondはただのMutexなのでselectで受けられないしとりあえずこのやり方でやってそんなに問題出てない。