17
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Go言語でスヌーズ機能付きリマインダーLINE botを実装した

Last updated at Posted at 2018-05-01

この度、スヌーズ機能つきのリマインダーLINE botをGo言語で実装しました。名前を「管理上手のうさちゃん」と言います

ソースコードはこちらにアップロードしています。https://github.com/kawasin73/usa-reminder

こんな感じで動きます。

sample.png

機能

  • 1日1回設定された時間に「飲んだ?」って聞いてくれる
  • 聞かれたら返信するまで、10分おきに「飲んだ??」って聞いてくれる(最大10回)
  • 設定した時間を変更できる
  • 設定した時間を削除できる

制約

herokuにデプロイしたいと思ったのですが、herokuは1プロセスごとに課金されます。

そのため、1プロセスで、HTTPアクセスのハンドリング、タイマーの設定、スヌーズ機能をやらないとお金がたくさんかかってしまいます。

Go言語は、マルチスレッドで実行できるため、HTTPサーバーの中で、スケジューラを実装して、1プロセスで全てできるようにしました。

結果的に状態をHTTPサーバーの中に持ち、タイマーの役割をします。そのため、1プロセスでしか実行できず冗長化はできません。

あと、スヌーズ中に再起動されると、スヌーズ情報が消えてしまいます。

設定された時間は、Redisに保存して消えないようにしました。

ソースコードの解説

大きく5つのレイヤーに分かれており、それぞれファイルが分かれています。

  • 初期化とハンドリング (main.go)
  • ユーザー情報 + LINEへのPUSHレイヤー (user.go)
  • スケジューラレイヤー (scheduler.go)
  • Redisへの永続化 + ユーザーリスト保存レイヤー (store.go)
  • HTTPハンドリングレイヤー (handler.go)

初期化とハンドリング

main.go
const (
	userPrefix = "user_"
	location   = "Asia/Tokyo"
	maxRetry   = 10
)

func init() {
	loc, err := time.LoadLocation(location)
	if err != nil {
		loc = time.FixedZone(location, 9*60*60)
	}
	time.Local = loc
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	wg := new(sync.WaitGroup)
	defer func() {
		cancel()
		wg.Wait()
	}()
	bot, err := linebot.New(
		os.Getenv("CHANNEL_SECRET"),
		os.Getenv("CHANNEL_TOKEN"),
	)
	if err != nil {
		log.Fatal(err)
	}
	redisUrl, err := url.Parse(os.Getenv("REDIS_URL"))
	if err != nil {
		log.Fatal("parse redis url : ", err)
	}
	redisPassword, _ := redisUrl.User.Password()
	redisDB := 0
	redisClient := redis.NewClient(&redis.Options{
		Addr:     redisUrl.Host,
		Password: redisPassword,
		DB:       redisDB,
	})

	scheduler := &Scheduler{bot: bot, chRemind: make(chan *User)}
	wg.Add(1)
	go scheduler.Reminder(ctx, wg)

	store := NewStore(ctx, redisClient, wg, scheduler)
	if err = store.Load(); err != nil {
		log.Fatal("load redis data : ", err)
	}

	h := &Handler{
		bot:   bot,
		store: store,
	}
	// Setup HTTP Server for receiving requests from LINE platform
	http.Handle("/callback", h)

	// This is just sample code.
	// For actual use, you must support HTTPS by using `ListenAndServeTLS`, a reverse proxy or something else.
	if err := http.ListenAndServe(":"+os.Getenv("PORT"), nil); err != nil {
		log.Fatal(err)
	}
}

初期化時にstore.Load()を実行して、保存された情報を復帰させています。
sync.WaitGroupを goroutine 生成の時には必ず登録して、goroutine のリークが起きないようにしています。実際は、シャットダウンのときは、goroutine がリークしていてもプロセスが終了するので必要ないかもしれませんが、お行儀がこっちの方がいいです。

ユーザー情報 + LINEへのPUSHレイヤー

user.go
type User struct {
	Id     string
	Hour   int
	Minute int

	ctx    context.Context
	cancel context.CancelFunc
	mu     sync.Mutex
	sent   int
}

func NewUser(ctx context.Context, id string, hour, minute int) *User {
	user := &User{
		Id:     id,
		Hour:   hour,
		Minute: minute,
	}
	user.ctx, user.cancel = context.WithCancel(ctx)
	return user
}

func (u *User) Data() string {
	return fmt.Sprintf("%d:%d", u.Hour, u.Minute)
}

func (u *User) ResetCount() (reset bool) {
	u.mu.Lock()
	reset = u.sent != 0
	u.sent = 0
	u.mu.Unlock()
	return
}

func (u *User) SendFirst(bot *linebot.Client) {
	u.mu.Lock()
	defer u.mu.Unlock()
	u.sent = 1
	_, err := bot.PushMessage(u.Id, linebot.NewTextMessage("飲んだ?")).Do()
	if err != nil {
		u.sent = 0
		log.Println(errors.Wrapf(err, "failed push message to (%v)", u.Id))
	}
}

func (u *User) SendRemind(bot *linebot.Client) (tryNext bool) {
	u.mu.Lock()
	defer u.mu.Unlock()
	if u.sent == 0 {
		// have received response
		return false
	}
	if u.sent > maxRetry {
		u.sent = 0
		return false
	}
	u.sent++
	_, err := bot.PushMessage(u.Id, linebot.NewTextMessage("飲んだ"+strings.Repeat("?", u.sent))).Do()
	if err != nil {
		log.Println(errors.Wrapf(err, "failed push message to (%v)", u.Id))
	}
	return true
}

func (u *User) Close() {
	u.cancel()
}

Userは最初はデータを保存するだけの場所にしようかと思っていたのですが、実装を進める上で、LINE APIへの送信と、送信数のカウントの機能も持たせることにしました。

ユーザーが設定時間を変えるたびに、新しいUserが作られ、古いものは、context.Contextがキャンセルされて捨てられます。

スケジューラレイヤー

scheduler.go
type Scheduler struct {
	bot      *linebot.Client
	chRemind chan *User
}

func (s *Scheduler) Watch(wg *sync.WaitGroup, u *User) {
	defer wg.Done()
	for {
		now := time.Now()
		t := time.Date(now.Year(), now.Month(), now.Day(), u.Hour, u.Minute, 0, 0, time.Local)
		if t.Before(now) {
			t = t.Add(time.Hour * 24)
		}
		select {
		case <-u.ctx.Done():
			return
		case <-time.After(t.Sub(time.Now())):
		}

		u.SendFirst(s.bot)

		select {
		case <-u.ctx.Done():
			return
		case s.chRemind <- u:
		}
	}
}

type Remind struct {
	u     *User
	timer <-chan time.Time
}

func (s *Scheduler) Reminder(ctx context.Context, wg *sync.WaitGroup) {
	defer wg.Done()
	queue := make([]Remind, 0)
	var remind Remind
	for {
		select {
		case <-ctx.Done():
			return
		case u := <-s.chRemind:
			r := Remind{u: u, timer: time.After(10 * time.Minute)}
			if remind.u == nil {
				remind = r
				continue
			}
			queue = append(queue, r)
		case <-remind.timer:
			if remind.u.SendRemind(s.bot) {
				// requeue
				queue = append(queue, Remind{u: remind.u, timer: time.After(10 * time.Minute)})
			}

			if len(queue) == 0 {
				remind.u, remind.timer = nil, nil
				continue
			}
			// deque
			remind, queue = queue[0], queue[1:]
		}
	}
}

ここが、このアプリの一番のキモです。

ユーザーごとに、func (s *Scheduler) Watch(wg *sync.WaitGroup, u *User) goroutine が1本ずつ立ち上がります。このスレッドの中で各ユーザーの設定時刻まで待ちます。
そして、設定時刻になって、リマインドメッセージを送ったら、chRemind を経由して、func (s *Scheduler) Reminder(ctx context.Context, wg *sync.WaitGroup)スレッドに送られて、スヌーズの処理を行います。

Scheduler の一番のキモは、func (s *Scheduler) Reminder(ctx context.Context, wg *sync.WaitGroup)です。これは、アプリの中で1本だけシングルスレッドで動いています。

chRemindから渡ってきたUserqueueに貯めていきます。Reminderスレッドはシングルスレッドなので、キューの操作にロックを取る必要がありません。キューは、スライスで表現します。

そして、先頭のUserが10分経つまで待ちます。

全てのユーザーの待ち時間が同じ10分であるため、先頭のユーザーが10分経った時にキューの中の全てのユーザーは10分経っていません。そのため、常に先頭のユーザーの残り時間を待っているだけで、スヌーズ機能が実現できるので、スヌーズ機能を1本のgoroutineに集約できます。

もし、スヌーズ待ちしているユーザーがいない場合は、Timerチャネルはnilになり、nilチャネルはブロックし続けるので通常状態と同じように処理をすることができます。

もし、ユーザーがリマインダーに返信をしてきた場合は、remind.u.SendRemind(s.bot)falseになり、スヌーズを中断します。

Redisへの永続化 + ユーザーリスト保存レイヤー

store.go
var (
	ErrNotFound = errors.New("user not found")
)

type Store struct {
	c    *redis.Client
	mu   sync.Mutex
	data map[string]*User
	sche *Scheduler

	ctx context.Context
	wg  *sync.WaitGroup
}

func NewStore(ctx context.Context, client *redis.Client, wg *sync.WaitGroup, sche *Scheduler) *Store {
	return &Store{
		c:    client,
		wg:   wg,
		sche: sche,
		ctx:  ctx,
	}
}

func (s *Store) Load() error {
	s.mu.Lock()
	defer s.mu.Unlock()

	keys, err := s.c.Keys(userPrefix + "*").Result()
	if err != nil {
		return errors.Wrap(err, "get all key")
	}
	users := make(map[string]*User, len(keys))

	for _, key := range keys {
		// TODO: MGET
		data, err := s.c.Get(key).Result()
		if err != nil {
			return errors.Wrap(err, "get all key")
		}
		times := strings.Split(data, ":")
		hour, err := strconv.Atoi(times[0])
		if err != nil {
			return errors.Wrap(err, "parse hour")
		}
		minute, err := strconv.Atoi(times[1])
		if err != nil {
			return errors.Wrap(err, "parse minute")
		}
		user := NewUser(s.ctx, key[len(userPrefix):], hour, minute)
		users[user.Id] = user
	}

	for _, user := range s.data {
		user.Close()
	}

	s.data = users

	for _, user := range users {
		s.wg.Add(1)
		go s.sche.Watch(s.wg, user)
	}

	return nil
}

func (s *Store) Set(userId string, hour, minute int) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	user := NewUser(s.ctx, userId, hour, minute)
	_, err := s.c.Set(userPrefix+user.Id, user.Data(), 0).Result()
	if err != nil {
		return errors.Wrap(err, "set to redis")
	}
	if old, ok := s.data[user.Id]; ok {
		old.Close()
	}
	s.data[user.Id] = user
	s.wg.Add(1)
	go s.sche.Watch(s.wg, user)
	return nil
}

func (s *Store) Get(userId string) *User {
	s.mu.Lock()
	user, _ := s.data[userId]
	s.mu.Unlock()
	return user
}

func (s *Store) Del(userId string) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	user, ok := s.data[userId]
	if !ok {
		return ErrNotFound
	}
	if _, err := s.c.Del(userPrefix + user.Id).Result(); err != nil {
		return errors.Wrapf(err, "del from redis")
	}
	user.Close()
	delete(s.data, userId)
	return nil
}

ほとんどそのままです。ユーザーは作成するたびに、Watchスレッドを立ち上げ、古いユーザーを削除します。

HTTPハンドリングレイヤー

handler.go
var timeMatcher = regexp.MustCompile("([0-9]+)時([0-9]+)分")

type Handler struct {
	bot   *linebot.Client
	store *Store
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	events, err := h.bot.ParseRequest(req)
	if err != nil {
		if err == linebot.ErrInvalidSignature {
			w.WriteHeader(400)
		} else {
			w.WriteHeader(500)
		}
		return
	}
	for _, event := range events {
		if event.Type == linebot.EventTypeMessage {
			switch message := event.Message.(type) {
			case *linebot.TextMessage:
				if err = h.onTextMessageEvent(event, message); err != nil {
					log.Println(err)
				}
			}
		}
	}
}

func (h *Handler) onTextMessageEvent(event linebot.Event, msg *linebot.TextMessage) error {
	reply, _ := h.handleText(event.Source.UserID, msg.Text)
	if _, err := h.bot.ReplyMessage(event.ReplyToken, linebot.NewTextMessage(reply)).Do(); err != nil {
		return err
	}
	return nil
}

func (h *Handler) handleText(userId, text string) (string, error) {
	// TODO: create command
	if text == "設定教えて" {
		user := h.store.Get(userId)
		if user == nil {
			return "設定されてないですよ", nil
		}
		return fmt.Sprintf("%v時%v分に設定されています", user.Hour, user.Minute), nil
	}
	if text == "ばいばい" {
		err := h.store.Del(userId)
		if err == ErrNotFound {
			return "設定されてないですよ", nil
		} else if err != nil {
			return "設定の削除に失敗しました", err
		}
		return "設定を削除しました。 ばいばい", nil
	}
	m := timeMatcher.FindStringSubmatch(text)
	if len(m) == 3 {
		hour, err := strconv.Atoi(m[1])
		if err != nil {
			return "何時ですか?", errors.Wrap(err, "parse hour")
		}
		minute, err := strconv.Atoi(m[2])
		if err != nil {
			return "何分ですか?", errors.Wrap(err, "parse minute")
		}
		err = h.store.Set(userId, hour, minute)
		if err != nil {
			return "時間の設定に失敗しました", errors.Wrap(err, "set time to store")
		}
		return fmt.Sprintf("%v時%v分ですね。わかりました。", hour, minute), nil
	}
	user := h.store.Get(userId)
	if user == nil {
		return "時間を設定してください", nil
	}
	if user.ResetCount() {
		return "よくできました", nil
	} else {
		// TODO: send random message
		return text, nil
	}
}

これもそのままです。func (h *Handler) handleText(userId, text string) (string, error)の中で、メッセージをパースして、必要な処理を行っています。

ユーザーがリマインダーに返信してきたときは、user.ResetCount()が呼ばれます。

最後に

作ってる途中でバグってメンヘラみたいになったのでスクショを撮っておきました。

IMG_5145.PNG

17
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?