1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ひろ亭Advent Calendar 2024

Day 18

Siwtch bot の通知を減らすために、DynamoDB Streams を活用してみよう

Posted at

こんにちは。
久しぶりに Qiita へ記事を書きます。

はじまり

今回は、 白"雪姫"さんが書かれた「SwitchBotの通知をAWSを使ってDiscordに飛ばしてみました」の文末にあった以下の記載に対して、何かできないかな?と思いまして、検証した結果をまとめました。

解錠・施錠時メッセージが発生しちゃうので,抑制する方法は無いかなぁとちょっと思っています。

Switch bot は状態変更が起きるたびにその変更を通知という形で、指定されたエンドポイントに飛ばすようです。明示はされていませんが、白"雪姫"さんの記事によれば、HTTP Lambda をエンドポイントとして構築し、受け付けているようです。これは、 Amazon API Gateway で受け付けていたとしても同様かと思います。

コードを見る限り、 Lambda 関数で施錠や解錠の状態変更を受け付けて、Lambda関数の環境変数に取り込み、現在の環境変数値と同じであれば通知をしないようにしているようですが、これだと、Lambda 関数の実行タイミングによっては環境変数が書き変わる前の状態で別ノードが起動してしまい、通知が重複してしまいます。実際、白"雪姫"さんの環境ではそのようになっているように見受けられます。

では、どうしたら?

まず考えてみたことは、 Amazon SQS の FIFO(先入先出)キューを使う方法。
確かに通知は減りそうなのですが、それでも重複を排除しきれなさそうでした。
Lambda の同時実行数を1にしてみるなど、なんやかんや検証してみましたが、通知が重複することに変わりはなかったです。

そこで、AWS Ambassadors が集まる Slack に以下のように投げてみました。

あるタイミングでJSONが2か所から飛んでくるんですが、飛んでくるときは1~5秒くらいの間にそれぞれ3発くらいきます。
このJSONの内容と現在のステータスを比較して通知するしないを判断したいのですが、1発だけ送りたいです。
現状は、 ステータスはLambdaの環境変数に登録しています。
JSONはいったん、Lambdaで受けて、SQS のFIFOに送信し、Lambda環境変数に記録しているステータスと比較しています。差異があれば通知。これでも、たまに2発飛んできたりします。
どう改善したらいいかしら?コストは限りなく抑えたいです。
・Systems Managerのパラメータストアにそのステータスを外出しする
・CloudWatchのカスタムメトリクスを使って、ステータスがAならアラート、Bに変化したら正常といった具合に通知する
ネックとなってるのは、おそらく、Lambdaが複数起動するときに環境変数上のステータス値の更新が間に合っていない。
CloudWatchがいいのかなー。どでしょ?

これを受けた、 AWS Ambassadors からの回答を要約すると、DynamoDB に状態情報を登録(PutItem/UpdateItem)し、DynamoDB Streams をトリガーとして 通知用の Lambda 関数を実行するというものでした。

というわけでやってみた

DynamoDB テーブルの設定

まずは、記録用の DynamoDB テーブルを作りました。

パーティションキー(PK) 要素1 要素2
position keyStatus battery

要素1、2については、初期データを入れておく必要があったので設定しました。

要素 内容 データ型 初期データ例
position SwitchBot のDeviceTypeが入る string(文字列型) WoLockPro
keyStatus 鍵の開閉状態(LOCKED/UNLOCKED) string(文字列型) LOCKED
battery N(数値型) 99

DynamoDB Streams の設定

表示タイプ(テーブルのデータが変更されるたびにストリーミングへ書き込まれる情報)を「新しいイメージ」としました。

「新しいイメージ」は、テーブル更新後の情報が Streams に書き込まれるものです。
ほかの設定値としては以下があります。

設定値 内容
キー属性のみ 変更された項目のキー属性のみ
新規イメージ 変更後に表示される項目全体
古いイメージ 変更前に表示されていた項目全体
新規イメージおよび古いイメージ 項目の新しいイメージと古いイメージの両方

また、DynamoDB Streams を設定するとともに「トリガー」の設定も行います。
後述の「DynamoDB Streams に変更内容が書き込まれたら実行する関数」を実行するようにします。

Lambda 関数

SwitchBot からの変更通知を受けて、DynamoDBに書き込む関数

import json
import boto3

table="keyStatus"
client = boto3.client("dynamodb")

def lambda_handler(event, context):
    message = json.loads(event["body"])
    deviceType = message["context"]["deviceType"]
    if deviceType == "WoHub2":
        print("鍵の開閉ではない:WoHub2")
        return

    keyState = message["context"]["lockState"]
    battery = message["context"]["battery"]

    response = client.get_item(
        TableName=table,
        Key={
            'position': {"S": deviceType}
        },
        ConsistentRead=True
    )

    lastKeyStatus = response['Item']['keyStatus']['S']

    if keyState == lastKeyStatus:
        print(f"同じ状態のため通知しない:{deviceType},{keyState}")
        return
    
    print(f"状態変更検知:{deviceType},{keyState}")

    response = client.update_item(
        TableName=table,
        Key={
            'position': {"S": deviceType}
        },
        UpdateExpression="SET keyStatus = :new_status, battery = :new_battery",
        ExpressionAttributeValues={
        ":new_status": {"S": keyState},
        ":new_battery": {"N": str(battery)},
        }
    )
    return

SwitchBot の通知を受けるために、関数 URL を有効化してあります。
SwitchBot から状態変更の通知を受けて起動します。受け取った通知(json)から、デバイスタイプや状態を取得します。

この時、デバイスタイプが対象のものではない場合(if deviceType == "WoHub2":のところ)、処理を終了します

次に、json から取得した状態と DynamoDB から取得した状態が異なれば、真の状態変更であると判断されたら、boto3 で update_item を実行します。
ちなみに、 DynamoDB から情報を取得する際には、強い整合性のある読み取り(ConsistentRead=True)を用いています。これは何かの弾みで更新中にデータ取得をするタイミングがあった際にも極力、重複排除できるようにするためのものです。

これで、変更した内容が DynamoDB Streams へ乗ることになります。

DynamoDB Streams に変更内容が書き込まれたら実行する関数

基本的に白"雪姫"さんが公開しているコードそのままですが、
イベントハンドラーに飛んでくる event には、DynamoDB Streamsからの情報が詰め込まれているので、取得するように変更しています。また、複数機器の同時更新もありえるため、for 文でぐるぐる回すようにしています。
前述の関数側で重複排除を極力できるようにしているため、こちらでは、 DynamoDB Streams のトリガーで起動し、渡された情報のみで処理するようにしました。

import json
import requests
import os

table = "keyStatus"
client = boto3.client("dynamodb")

topKey = "WoLockPro"
bottomKey = "WoLock"
url = os.getenv('URL')
userId = os.getenv('USER_ID')

headers ={"Content-Type": "application/json"}

def lambda_handler(event, context):
    for record in event['Records']:
        deviceType = record['dynamodb']["Keys"]["position"]["S"]
        keyStatus = record['dynamodb']["NewImage"]["keyStatus"]["S"]
        battery = record['dynamodb']["NewImage"]["battery"]["N"]

        if deviceType == topKey:
            keyType = "上の鍵"
        elif deviceType == bottomKey:
            keyType = "下の鍵"

        # 送信内容
        payload = {"content": f"<@{userId}> {keyType}の状態:{keyStatus}, 電池残量:{battery}"}
        print(payload)
        # POSTリクエストを送信
        requests.post(url, json=payload, headers=headers) 

まとめ

テスト実行であれば、重複なく Discord に通知を飛ばすことができたのと速度的にも Amazon SNS FIFO キューを使っていた時と比較しても早くなったので、一旦は満足です。

別パターンの Systems Manager パラメーターストアを使ったものも検討してみたいですが、整合性や速度面、動機処理の有無などを鑑みて、総合的にコスパが良ければ乗り移るもよいよいですね。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?