Go
Heroku
golang
Line

Go言語でスヌーズ機能付きリマインダーLINE botを実装した

この度、スヌーズ機能つきのリマインダーLINE botをGo言語で実装しました。

ソースコードはこちらにアップロードしています。https://github.com/kawasin73/usa-reminder

こんな感じで動きます。

sample.png

機能

  • 1日1回設定された時間に「飲んだ?」って聞いてくれる
  • 聞かれたら返信するまで、10分おきに「飲んだ??」って聞いてくれる(最大10回)
  • 設定した時間を変更できる
  • 設定した時間を削除できる

制約

herokuにデプロイしたいと思ったのですが、herokuは1プロセスごとに課金されます。

そのため、1プロセスで、HTTPアクセスのハンドリング、タイマーの設定、スヌーズ機能をやらないとお金がたくさんかかってしまいます。

Go言語は、マルチスレッドで実行できるため、HTTPサーバーの中で、スケジューラを実装して、1プロセスで全てできるようにしました。

結果的に状態をHTTPサーバーの中に持ち、タイマーの役割をします。そのため、1プロセスでしか実行できず冗長化はできません。

あと、スヌーズ中に再起動されると、スヌーズ情報が消えてしまいます。

設定された時間は、Redisに保存して消えないようにしました。

ソースコードの解説

大きく5つのレイヤーに分かれており、それぞれファイルが分かれています。

  • 初期化とハンドリング (main.go)
  • ユーザー情報 + LINEへのPUSHレイヤー (user.go)
  • スケジューラレイヤー (scheduler.go)
  • Redisへの永続化 + ユーザーリスト保存レイヤー (store.go)
  • HTTPハンドリングレイヤー (handler.go)

初期化とハンドリング

main.go
const (
    userPrefix = "user_"
    location   = "Asia/Tokyo"
    maxRetry   = 10
)

func init() {
    loc, err := time.LoadLocation(location)
    if err != nil {
        loc = time.FixedZone(location, 9*60*60)
    }
    time.Local = loc
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    wg := new(sync.WaitGroup)
    defer func() {
        cancel()
        wg.Wait()
    }()
    bot, err := linebot.New(
        os.Getenv("CHANNEL_SECRET"),
        os.Getenv("CHANNEL_TOKEN"),
    )
    if err != nil {
        log.Fatal(err)
    }
    redisUrl, err := url.Parse(os.Getenv("REDIS_URL"))
    if err != nil {
        log.Fatal("parse redis url : ", err)
    }
    redisPassword, _ := redisUrl.User.Password()
    redisDB := 0
    redisClient := redis.NewClient(&redis.Options{
        Addr:     redisUrl.Host,
        Password: redisPassword,
        DB:       redisDB,
    })

    scheduler := &Scheduler{bot: bot, chRemind: make(chan *User)}
    wg.Add(1)
    go scheduler.Reminder(ctx, wg)

    store := NewStore(ctx, redisClient, wg, scheduler)
    if err = store.Load(); err != nil {
        log.Fatal("load redis data : ", err)
    }

    h := &Handler{
        bot:   bot,
        store: store,
    }
    // Setup HTTP Server for receiving requests from LINE platform
    http.Handle("/callback", h)

    // This is just sample code.
    // For actual use, you must support HTTPS by using `ListenAndServeTLS`, a reverse proxy or something else.
    if err := http.ListenAndServe(":"+os.Getenv("PORT"), nil); err != nil {
        log.Fatal(err)
    }
}

初期化時にstore.Load()を実行して、保存された情報を復帰させています。
sync.WaitGroupを goroutine 生成の時には必ず登録して、goroutine のリークが起きないようにしています。実際は、シャットダウンのときは、goroutine がリークしていてもプロセスが終了するので必要ないかもしれませんが、お行儀がこっちの方がいいです。

ユーザー情報 + LINEへのPUSHレイヤー

user.go
type User struct {
    Id     string
    Hour   int
    Minute int

    ctx    context.Context
    cancel context.CancelFunc
    mu     sync.Mutex
    sent   int
}

func NewUser(ctx context.Context, id string, hour, minute int) *User {
    user := &User{
        Id:     id,
        Hour:   hour,
        Minute: minute,
    }
    user.ctx, user.cancel = context.WithCancel(ctx)
    return user
}

func (u *User) Data() string {
    return fmt.Sprintf("%d:%d", u.Hour, u.Minute)
}

func (u *User) ResetCount() (reset bool) {
    u.mu.Lock()
    reset = u.sent != 0
    u.sent = 0
    u.mu.Unlock()
    return
}

func (u *User) SendFirst(bot *linebot.Client) {
    u.mu.Lock()
    defer u.mu.Unlock()
    u.sent = 1
    _, err := bot.PushMessage(u.Id, linebot.NewTextMessage("飲んだ?")).Do()
    if err != nil {
        u.sent = 0
        log.Println(errors.Wrapf(err, "failed push message to (%v)", u.Id))
    }
}

func (u *User) SendRemind(bot *linebot.Client) (tryNext bool) {
    u.mu.Lock()
    defer u.mu.Unlock()
    if u.sent == 0 {
        // have received response
        return false
    }
    if u.sent > maxRetry {
        u.sent = 0
        return false
    }
    u.sent++
    _, err := bot.PushMessage(u.Id, linebot.NewTextMessage("飲んだ"+strings.Repeat("?", u.sent))).Do()
    if err != nil {
        log.Println(errors.Wrapf(err, "failed push message to (%v)", u.Id))
    }
    return true
}

func (u *User) Close() {
    u.cancel()
}

Userは最初はデータを保存するだけの場所にしようかと思っていたのですが、実装を進める上で、LINE APIへの送信と、送信数のカウントの機能も持たせることにしました。

ユーザーが設定時間を変えるたびに、新しいUserが作られ、古いものは、context.Contextがキャンセルされて捨てられます。

スケジューラレイヤー

scheduler.go
type Scheduler struct {
    bot      *linebot.Client
    chRemind chan *User
}

func (s *Scheduler) Watch(wg *sync.WaitGroup, u *User) {
    defer wg.Done()
    for {
        now := time.Now()
        t := time.Date(now.Year(), now.Month(), now.Day(), u.Hour, u.Minute, 0, 0, time.Local)
        if t.Before(now) {
            t = t.Add(time.Hour * 24)
        }
        select {
        case <-u.ctx.Done():
            return
        case <-time.After(t.Sub(time.Now())):
        }

        u.SendFirst(s.bot)

        select {
        case <-u.ctx.Done():
            return
        case s.chRemind <- u:
        }
    }
}

type Remind struct {
    u     *User
    timer <-chan time.Time
}

func (s *Scheduler) Reminder(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    queue := make([]Remind, 0)
    var remind Remind
    for {
        select {
        case <-ctx.Done():
            return
        case u := <-s.chRemind:
            r := Remind{u: u, timer: time.After(10 * time.Minute)}
            if remind.u == nil {
                remind = r
                continue
            }
            queue = append(queue, r)
        case <-remind.timer:
            if remind.u.SendRemind(s.bot) {
                // requeue
                queue = append(queue, Remind{u: remind.u, timer: time.After(10 * time.Minute)})
            }

            if len(queue) == 0 {
                remind.u, remind.timer = nil, nil
                continue
            }
            // deque
            remind, queue = queue[0], queue[1:]
        }
    }
}

ここが、このアプリの一番のキモです。

ユーザーごとに、func (s *Scheduler) Watch(wg *sync.WaitGroup, u *User) goroutine が1本ずつ立ち上がります。このスレッドの中で各ユーザーの設定時刻まで待ちます。
そして、設定時刻になって、リマインドメッセージを送ったら、chRemind を経由して、func (s *Scheduler) Reminder(ctx context.Context, wg *sync.WaitGroup)スレッドに送られて、スヌーズの処理を行います。

Scheduler の一番のキモは、func (s *Scheduler) Reminder(ctx context.Context, wg *sync.WaitGroup)です。これは、アプリの中で1本だけシングルスレッドで動いています。

chRemindから渡ってきたUserqueueに貯めていきます。Reminderスレッドはシングルスレッドなので、キューの操作にロックを取る必要がありません。キューは、スライスで表現します。

そして、先頭のUserが10分経つまで待ちます。

全てのユーザーの待ち時間が同じ10分であるため、先頭のユーザーが10分経った時にキューの中の全てのユーザーは10分経っていません。そのため、常に先頭のユーザーの残り時間を待っているだけで、スヌーズ機能が実現できるので、スヌーズ機能を1本のgoroutineに集約できます。

もし、スヌーズ待ちしているユーザーがいない場合は、Timerチャネルはnilになり、nilチャネルはブロックし続けるので通常状態と同じように処理をすることができます。

もし、ユーザーがリマインダーに返信をしてきた場合は、remind.u.SendRemind(s.bot)falseになり、スヌーズを中断します。

Redisへの永続化 + ユーザーリスト保存レイヤー

store.go
var (
    ErrNotFound = errors.New("user not found")
)

type Store struct {
    c    *redis.Client
    mu   sync.Mutex
    data map[string]*User
    sche *Scheduler

    ctx context.Context
    wg  *sync.WaitGroup
}

func NewStore(ctx context.Context, client *redis.Client, wg *sync.WaitGroup, sche *Scheduler) *Store {
    return &Store{
        c:    client,
        wg:   wg,
        sche: sche,
        ctx:  ctx,
    }
}

func (s *Store) Load() error {
    s.mu.Lock()
    defer s.mu.Unlock()

    keys, err := s.c.Keys(userPrefix + "*").Result()
    if err != nil {
        return errors.Wrap(err, "get all key")
    }
    users := make(map[string]*User, len(keys))

    for _, key := range keys {
        // TODO: MGET
        data, err := s.c.Get(key).Result()
        if err != nil {
            return errors.Wrap(err, "get all key")
        }
        times := strings.Split(data, ":")
        hour, err := strconv.Atoi(times[0])
        if err != nil {
            return errors.Wrap(err, "parse hour")
        }
        minute, err := strconv.Atoi(times[1])
        if err != nil {
            return errors.Wrap(err, "parse minute")
        }
        user := NewUser(s.ctx, key[len(userPrefix):], hour, minute)
        users[user.Id] = user
    }

    for _, user := range s.data {
        user.Close()
    }

    s.data = users

    for _, user := range users {
        s.wg.Add(1)
        go s.sche.Watch(s.wg, user)
    }

    return nil
}

func (s *Store) Set(userId string, hour, minute int) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    user := NewUser(s.ctx, userId, hour, minute)
    _, err := s.c.Set(userPrefix+user.Id, user.Data(), 0).Result()
    if err != nil {
        return errors.Wrap(err, "set to redis")
    }
    if old, ok := s.data[user.Id]; ok {
        old.Close()
    }
    s.data[user.Id] = user
    s.wg.Add(1)
    go s.sche.Watch(s.wg, user)
    return nil
}

func (s *Store) Get(userId string) *User {
    s.mu.Lock()
    user, _ := s.data[userId]
    s.mu.Unlock()
    return user
}

func (s *Store) Del(userId string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    user, ok := s.data[userId]
    if !ok {
        return ErrNotFound
    }
    if _, err := s.c.Del(userPrefix + user.Id).Result(); err != nil {
        return errors.Wrapf(err, "del from redis")
    }
    user.Close()
    delete(s.data, userId)
    return nil
}

ほとんどそのままです。ユーザーは作成するたびに、Watchスレッドを立ち上げ、古いユーザーを削除します。

HTTPハンドリングレイヤー

handler.go
var timeMatcher = regexp.MustCompile("([0-9]+)時([0-9]+)分")

type Handler struct {
    bot   *linebot.Client
    store *Store
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    events, err := h.bot.ParseRequest(req)
    if err != nil {
        if err == linebot.ErrInvalidSignature {
            w.WriteHeader(400)
        } else {
            w.WriteHeader(500)
        }
        return
    }
    for _, event := range events {
        if event.Type == linebot.EventTypeMessage {
            switch message := event.Message.(type) {
            case *linebot.TextMessage:
                if err = h.onTextMessageEvent(event, message); err != nil {
                    log.Println(err)
                }
            }
        }
    }
}

func (h *Handler) onTextMessageEvent(event linebot.Event, msg *linebot.TextMessage) error {
    reply, _ := h.handleText(event.Source.UserID, msg.Text)
    if _, err := h.bot.ReplyMessage(event.ReplyToken, linebot.NewTextMessage(reply)).Do(); err != nil {
        return err
    }
    return nil
}

func (h *Handler) handleText(userId, text string) (string, error) {
    // TODO: create command
    if text == "設定教えて" {
        user := h.store.Get(userId)
        if user == nil {
            return "設定されてないですよ", nil
        }
        return fmt.Sprintf("%v時%v分に設定されています", user.Hour, user.Minute), nil
    }
    if text == "ばいばい" {
        err := h.store.Del(userId)
        if err == ErrNotFound {
            return "設定されてないですよ", nil
        } else if err != nil {
            return "設定の削除に失敗しました", err
        }
        return "設定を削除しました。 ばいばい", nil
    }
    m := timeMatcher.FindStringSubmatch(text)
    if len(m) == 3 {
        hour, err := strconv.Atoi(m[1])
        if err != nil {
            return "何時ですか?", errors.Wrap(err, "parse hour")
        }
        minute, err := strconv.Atoi(m[2])
        if err != nil {
            return "何分ですか?", errors.Wrap(err, "parse minute")
        }
        err = h.store.Set(userId, hour, minute)
        if err != nil {
            return "時間の設定に失敗しました", errors.Wrap(err, "set time to store")
        }
        return fmt.Sprintf("%v時%v分ですね。わかりました。", hour, minute), nil
    }
    user := h.store.Get(userId)
    if user == nil {
        return "時間を設定してください", nil
    }
    if user.ResetCount() {
        return "よくできました", nil
    } else {
        // TODO: send random message
        return text, nil
    }
}

これもそのままです。func (h *Handler) handleText(userId, text string) (string, error)の中で、メッセージをパースして、必要な処理を行っています。

ユーザーがリマインダーに返信してきたときは、user.ResetCount()が呼ばれます。

最後に

作ってる途中でバグってメンヘラみたいになったのでスクショを撮っておきました。

IMG_5145.PNG