メリークリスマス!
概要
非同期にログのようなものをサービスから送信するようなclientを作るとき、「サーバーをどう落とすか」というのはとても難しい問題だと思います。
特にお金がかかわるようなログだと送り漏れがないようにする必要がありますし「もう同期でいいんじゃね」と頭を抱えてしまいます。
非同期なのでバッファのような場所にデータがたまっているので、これを掃除してから落とす必要があるのが厄介の源です。
要件
clientは、あるjson型のログを別のRESTのシステムに投げるようなユースケースを想定していますが、ここではそこは端折ってio.Writerに書いています。
clientは、Sendで送られたメッセージをキャッシュし、
- キャッシュがあるサイズに達したら
- 定期的に
- 最後にサーバーをShutdownするときに
の3つの条件でflushします。
解
解1:同期的に送信する
まずシンプルな解として同期的なものを用意しておきます。非同期処理はこれと同じ事を実現する必要があります。
type V1Client struct {
w io.Writer
}
func NewV1Clienet(w io.Writer) *V1Client {
return &V1Client{w}
}
func (c *V1Client) Send(message string) error {
_, err := fmt.Fprintln(c.w, message)
return err
}
これでなぜダメだったかといいますと、メッセージを束ねる必要があったためです。
解2:戦いはここから始まった
type V2Client struct {
w io.Writer
q chan string
logs []string
max int
mu *sync.Mutex
itvl time.Duration
}
func NewV2Client(w io.Writer, bufsize int, itvl time.Duration) *V2Client {
return &V2Client{
w: w,
logs: make([]string, bufsize)[:0],
mu: &sync.Mutex{},
max: bufsize,
itvl: itvl,
}
}
func (c *V2Client) Send(ctx context.Context, message string) {
c.mu.Lock()
defer c.mu.Unlock()
c.logs = append(c.logs, message)
if len(c.logs) >= c.max {
c.flush(ctx)
}
}
func (c *V2Client) Start(ctx context.Context) {
go func() {
for {
time.Sleep(c.itvl)
c.mu.Lock()
if err := c.flush(ctx); err != nil {
fmt.Printf("error: %+v\n", err)
}
c.mu.Unlock()
select {
case <-ctx.Done():
return
}
}
}()
}
func (c *V2Client) flush(ctx context.Context) error {
defer func() {
c.logs = c.logs[:0]
}()
for _, m := range c.logs {
if _, err := fmt.Fprintln(c.w, m); err != nil {
return err
}
}
return nil
}
非同期バージョンの最初の実装はこんな感じでした。ほかの言語のイメージで実装するとこんな感じになるのではないかと思います。
この実装には様々な問題点があります。
- context cancelすることでプログラムを落とす、その時にlogsの中に残ったログの掃除をしていない
- logsという配列を共有メモリのようにgoroutine間のデータのやり取りに使っている
- logsがオーバーフローしたとき、同期的にログを送信してしまっている
主にこの3点でしょうか。ほかにもいろいろあるとは思いますが。
解3:修正版
type V3Client struct {
logs []*string // log buffer
q chan string // channel to send log
max int // max size of logs
itvl time.Duration // log send interval
w io.Writer // l2pc backup logger
exit chan struct{}
once *sync.Once
}
func NewV3Client(writer io.Writer, buffsize int, interval time.Duration) *V3Client {
client := &V3Client{
logs: make([]string, buffsize)[:0],
q: make(chan string),
max: buffsize,
itvl: interval,
w: writer,
once: &sync.Once{},
}
return client
}
func (c *V3Client) Send(message string) {
c.q <- message
}
func (c *V3Client) Start(ctx context.Context) {
if c.exit != nil {
fmt.Println("client is already started.")
return
}
c.exit = make(chan struct{})
go func() {
t := time.NewTicker(c.itvl)
defer t.Stop()
for {
select {
case m := <-c.q:
if err := c.tryFlush(ctx, m); err != nil {
fmt.Println("log send error: " + err.Error())
}
case <-t.C:
if err := c.flush(ctx); err != nil {
fmt.Println("log send error: " + err.Error())
}
case <-c.exit:
return
}
}
}()
}
func (c *V3Client) Close(ctx context.Context) {
c.once.Do(func() {
c.exit <- struct{}{}
if err := c.flushAll(ctx); err != nil {
fmt.Println("log send error" + err.Error())
}
})
}
func (c *V3Client) tryFlush(ctx context.Context, msg string) error {
c.logs = append(c.logs, msg)
if len(c.logs) >= c.max {
return c.flush(ctx)
}
return nil
}
func (c *V3Client) flushAll(ctx context.Context) error {
close(c.q)
all := c.logs
for l := range c.q {
all = append(all, l)
}
for {
if len(all) < c.max {
c.logs = all
return c.flush(ctx)
}
c.logs, all = all[:c.max], all[c.max:]
if err := c.flush(ctx); err != nil {
return err
}
}
}
func (c *V3Client) flush(ctx context.Context) error {
defer func() {
c.logs = c.logs[:0]
}()
for _, l := range c.logs {
if _, err := fmt.Fprintln(c.w, l); err != nil {
return err
}
}
return nil
}}
各メソッドはそれぞれ
client | 説明 |
---|---|
client#Start | clientの非同期送信を開始する。goroutine部。 |
client#Send | メッセージを送る(入力側)。業務ロジックが呼び出す。 |
client$Close | 終了処理開始および、終了処理がおわるまで待機 |
client#flush | 実送信部 |
client#tryFlush | メッセージを受け取り必要であればflushする |
client#flushAll | 終了処理 |
です。#2と比較して特に違うのは「ノンブロッキング」だという事だと思います。
ほかの言語だと、こういったプログラムには必ずといっていいほどLockが顔を出します。ですが上にはありません。
- SleepではなくTickerを使っている
- logsをgoroutineで共有せず、goroutine間の受け渡しにqというchanを使っている
ためです。
また、終了処理のために専用のexitというchを導入しました。なのでselect/caseで3つのchannelを待つコードになっています。
学んだこと
- goroutine間を渡すデータはlockして共有メモリで渡すのではなくchannelで渡す
- 一定時間置きに起動する処理はSleepを使わずTickerを使う
-
defer cl.Close()
で終了処理を開始しつつかつ終了処理の終了まで待たせる - select/caseで複数のchannelを待つとき、同時に複数のchannelにデータが入ったときどのchannelが起動するかはrandom
func main(){
ctx := context.Background()
cl := NewV3Client(...)
cl.Start(ctx)
defer cl.Close(ctx)
// start server with client
}
最終的なmainはこのようになりました。
割と普通なインターフェースに収まったように思います。
他の言語に慣れていると、「ノンブロッキングに無限forループを回す」というのはなかなか違和感があるのですが、確かにPythonのGILのように、過剰な範囲で足止めをしてしまい、パフォーマンスの足を引っ張る要因になりやすいのはわかる気がします。
一旦ここまでで。