server.go
go service.LastAccessUpdater.Run(ctx)
// このdeferでsignal受けた後同期的に溜まっているものがあったらupdateするようにする
defer service.LastAccessUpdater.Flush(ctx)
handler/auth.go
// bufferに追加
h.LastAcccessUpdater.Add(user.UserID)
service/last_access_updater.go
package service
type LastAccessUpdater interface {
Run(context.Context)
Add(string)
Flush()
}
type LastAccessUpdaterImpl struct {
sync.Mutex
loginManager service.LoginManager
buffer []string
}
func NewLastAccessUpdater(l service.LoginManager) *LastAccessUpdaterImpl {
return &LastAccessUpdaterImpl{
loginManager: l,
buffer: make([]string),
}
}
func (b *LastAccessUpdaterImpl) Add(userID string) {
b.Lock()
defer b.Unlock()
b.buffer = append(b.buffer, userID)
}
func (b *LastAccessUpdaterImpl) Run(ctx context.Context) {
interval := 5*time.Second
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
b.Flush(ctx)
case <-ctx.Done():
b.Flush(ctx)
return
}
}
}
func (b *LastAccessUpdaterImpl) resetBuffer() map[TeamChannelViewerUpdateBufferKey]int64 {
b.Lock()
defer b.Unlock()
buf := b.buffer
b.buffer = make([]string)
return buf
}
func (b *LastAccessUpdaterImpl) Flush(ctx context.Context) {
buf := b.resetBuffer()
for _, userID := range buf {
// update処理
}
}
非同期で実行する処理があった際bufferに入れ、それをgo routineで定期的に処理するようにしている
デプロイ等でapiサーバーが死んだ時も処理されるよう、signalを受け取ってflushするようにしている