search
LoginSignup
2

More than 3 years have passed since last update.

posted at

[AWS] LINEWORKSでリマインダBOTを作ってみた(実装編)

LINEWORKS Advent Calendar 14日目です。

今回は、LINEWORKS Advent Calendar 7日目 で紹介したリマインダBOT の実装について紹介します。

[再掲]BOTの画面と全体構成

review.png

リマインダBOTは3つのLambdaで構成されており、Python3.7 で実装してます。

①. LINEWORKSから送信されるメッセージの処理およびSQSへの通知
②. テーブル内に保存されたイベントをポーリングおよびSQSへの通知
③. SQSから受信したメッセージをLINEWORKSサーバに通知

今回は、①に焦点を当てて紹介します。

状態遷移表とメッセージリスト

ユーザとのBOTのやり取りを状態遷移表で表現しました。リマインダBOTは、以下の4つのイベントを扱います。

  • ユーザ参加
    • ユーザがBOTを追加時に発生
  • テキスト入力
    • ユーザがBOTに対して任意のテキスト入力
  • イベント入力ボタン押下
    • BOT内のメニューに表示される「イベント登録」を押下
  • イベント出力ボタン押下
    • BOT内のメニューに表示される「イベント参照」を押下

table.png

それぞれのユーザイベントに対して4つの状態を管理します。
BOTは、ユーザイベントとBOTの状態に対応するメッセージをユーザに返答します。
メッセージの内容は、メッセージリストとして定義しておきます。

Lambdaの実装

では、本題のLambdaの実装です。

まずは、Lambda関数の全体の処理です。
リクエストボディの検証とメッセージのメインの処理を担う自作のon_event関数を呼び出します。
リクエストボディの検証は、ヘッダーのx-works-signatureの値に基づいて処理します。

"""
index.py
"""

import os
import json
from base64 import b64encode, b64decode
import hashlib
import hmac

import reminderbot

API_ID = os.environ.get("API_ID")


def validate(payload, signature):
    """
    x-works-signatureの検証
    """

    key = API_ID.encode("utf-8")
    payload = payload.encode("utf-8")

    encoded_body = hmac.new(key, payload, hashlib.sha256).digest()
    encoded_base64_body = b64encode(encoded_body).decode()

    return encoded_base64_body == signature


def handler(event, context):
    """
    main関数
    """

    # リクエストボディの検証
    if not validate(event["body"], event["headers"].get("x-works-signature")):
        return {
            "statusCode": 400,
            "body": "Bad Request",
            "headers": {
                "Content-Type": "application/json"
            }
        }

    body = json.loads(event["body"])

    # メッセージのメイン処理
    reminderbot.on_event(body)

    return {
        "statusCode": 200,
        "body": "OK",
        "headers": {"Content-Type": "application/json"}
    }

続いて、on_event関数についてです。
今回事前に定めた、4つの状態、4つのユーザイベント、メッセージリストを定数で定義しておきます。

"""
reminderbot.py
"""

import os

import json
import datetime
import dateutil.parser
from dateutil.relativedelta import relativedelta

import boto3
from boto3.dynamodb.conditions import Key, Attr

# 状態遷移表に基づき4つの状態を定義
STATUS_NO_USER = "no_user"
STATUS_WATING_FOR_BUTTON_PUSH = "status_waiting_for_button_push"
STATUS_WATING_FOR_NAME_INPUT = "status_waiting_for_name_input"
STATUS_WATING_FOR_TIME_INPUT = "status_waiting_for_time_input"

# メッセージリストに基づき定義
MESSAGE_LIST = [
    "こんにちは、リマインドボットだよ。メニューボタンを押してね。",
    "イベント名を入力してね",
    "メニューボタンを押してね。",
    "イベントの内容はこちら!",
    "イベント時間を入力してね。",
    "登録完了!",
    "エラーだよ。もう一度入力してね。",
]

# ユーザのイベントをpostbackイベントとして定義
# BOTのメニュー登録時は、以下のpostbackイベントの値と同じにすること
POSTBACK_START = "start"
POSTBACK_MESSAGE = "message"
POSTBACK_PUSH_PUT_EVENT_BUTTON = "push_put_event_button"
POSTBACK_PUSH_GET_EVENT_BUTTON = "push_get_event_button"

# ステータスを管理するテーブル
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("lineworks-sample-table")


def on_event(event):
    """
    botの全体のイベントの処理
    """

    account_id = event["source"]["accountId"]
    content = event["content"]

    postback =  content.get("postback") or "message"

    # ユーザの今の状態を確認
    response = table.get_item(
        Key={
            "Hash": "status_" + account_id,
            "Range": "-"
        }
    )

    status = STATUS_NO_USER
    message = None

    if response.get("Item") is not None:
        status = response.get("Item")["Status"]

    # 各ユーザイベント(postback)毎の分岐処理
    try:

        if postback == POSTBACK_START:
            message = on_join(account_id, status)

        elif postback == POSTBACK_MESSAGE:
            text = content["text"]
            message = on_message(account_id, status, text)

        elif postback == POSTBACK_PUSH_PUT_EVENT_BUTTON:
            message = on_pushed_put_event_button(account_id, status)

        elif postback == POSTBACK_PUSH_GET_EVENT_BUTTON:
            message = on_pushed_get_event_button(account_id, status)

    except Exception as e:
        print(e)
        message = MESSAGE_LIST[6]

    # SQSにメッセージ内容を通知
    sqs = boto3.resource("sqs")
    queue = sqs.get_queue_by_name(QueueName="lineworks-message-queue")

    queue.send_message(
        MessageBody=json.dumps(
            {
                "content": {
                    "type": "text",
                    "text": message,
                },
                "account_id": account_id,
            }
        ),
    )

    return True

最後に、各イベントごとの処理の実装です。
それぞれのイベントの中で、各状態ごとの分岐処理を状態遷移表に基づいて実装しています。
重複する処理はまとめています。

def on_join(account_id, status):
    """
    bot追加時のイベントの処理
    """

    # ステータスに応じた分岐処理
    if status == STATUS_NO_USER:

        table.put_item(
            Item={
                "Hash": "status_" + account_id,
                "Range": "-",
                "Status": STATUS_WATING_FOR_BUTTON_PUSH,
            }
        )
        return MESSAGE_LIST[0]

    else:

        table.delete_item(
            Key={
                "Hash": "status_" + account_id,
                "Range": "-"
            }
        )

        table.put_item(
            Item={
                "Hash": "status_" + account_id,
                "Range": "-",
                "Status": STATUS_WATING_FOR_BUTTON_PUSH,
            }
        )

        return MESSAGE_LIST[0]

def on_message(account_id, status, text):
    """
    テキスト入力時のイベントの処理
    """

    if status == STATUS_WATING_FOR_BUTTON_PUSH:

        table.put_item(
            Item={
                "Hash": "status_" + account_id,
                "Range": "-",
                "Status": STATUS_WATING_FOR_BUTTON_PUSH,
            }
        )
        return MESSAGE_LIST[2]

    elif status == STATUS_WATING_FOR_NAME_INPUT:

        table.update_item(
            Key={
                "Hash": "status_" + account_id,
                "Range": "-",
            },
            UpdateExpression="set #st = :s, Title = :t",
            ExpressionAttributeNames = {
                "#st": "Status" # Statusは予約語なので#stに置き換える
            },
            ExpressionAttributeValues={
                ":s": STATUS_WATING_FOR_TIME_INPUT,
                ":t": text,
            },
        )
        return MESSAGE_LIST[4]

    elif status == STATUS_WATING_FOR_TIME_INPUT:

        # dateutil.parserで日付は変換
        time_dt = dateutil.parser.parse(text)
        time = time_dt.strftime("%Y/%m/%d %H:%M:%S")

        response = table.get_item(
            Key={
                "Hash": "status_" + account_id,
                "Range": "-",
            }
        )

        table.put_item(
            Item={
                "Hash": "event_" + account_id,
                "Range": time,
                "Title": response["Item"]["Title"],
                # utc -> 日本時間変換のため、9時間の差分をとる
                # utc -> 当初の予定 + 1h後に削除するように設定
                "ExpireTime": int((time_dt - relativedelta(hours=9) + relativedelta(hours=1)).timestamp()),
                "SentFlag": False
            }
        ),

        table.put_item(
            Item={
                "Hash": "status_" + account_id,
                "Range": "-",
                "Status": STATUS_WATING_FOR_BUTTON_PUSH,
            }
        )

        return MESSAGE_LIST[5]

def on_pushed_put_event_button(account_id, status):
    """
    「イベント登録」ボタン押下時のイベントの処理
    """

    if status == STATUS_WATING_FOR_BUTTON_PUSH:

        table.put_item(
            Item={
                "Hash": "status_" + account_id,
                "Range": "-",
                "Status": STATUS_WATING_FOR_NAME_INPUT,
            }
        )
        return MESSAGE_LIST[1]

    elif status == STATUS_WATING_FOR_NAME_INPUT:

        return MESSAGE_LIST[1]

    elif status == STATUS_WATING_FOR_TIME_INPUT:

        table.put_item(
            Item={
                "Hash": "status_" + account_id,
                "Range": "-",
                "Status": STATUS_WATING_FOR_NAME_INPUT,
            }
        )
        return MESSAGE_LIST[1]

def on_pushed_get_event_button(account_id, status):
    """
    「イベント参照」ボタン押下時のイベントの処理
    """

    current_jst_time = (datetime.datetime.utcnow() + relativedelta(hours=9)).strftime("%Y/%m/%d %H:%M:%S")

    # event取得処理 
    response = table.query(
        KeyConditionExpression=Key("Hash").eq("event_" + account_id) & Key("Range").gt(current_jst_time)
    )

    items = response["Items"] or []

    message = MESSAGE_LIST[3]

    if len(items) == 0:
        message += "\n-----"
        message += "\nなし"
        message += "\n-----"

    for item in items:

        message += "\n-----"
        message += "\n タイトル: {title}".format(title=item["Title"]) 
        message += "\n 日時: {time}".format(time=item["Range"]) 
        message += "\n-----"

    return message

まとめ

状態遷移表を作成することで、各イベント時にどのような処理を実装すべきか、
どのメッセージを返すべきか、が明確になるので迷いなく実装することができました。

今回は、シンプルなアプリだったので状態やイベントの数も少ないですが、
より複雑な処理をBOTにさせようとすると状態遷移表がより役に立ってくるかと思います。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
2