#はじめに
今回はIoT Ruleでできることというテーマで、アラート通知とシステムの疎結合化の方法について書きました。具体的にはエッジデバイスからMQTT通信でIoT CoreにJSON形式のメッセージを定期的に送信する際、異常な値が検知された場合に誰かに通知したり、特定のサービスを止めたりする想定で、Ruleの解説をしていきます。
※この記事内で紹介するコードは動作確認していないものも含まれるのでご了承ください。
#共通部分-Ruleの作成
IoT Coreの画面から「ACT」→「ルール」→「作成」をクリック。
メッセージのフィルタリングにSQL構文が使えます。
例えば温度が40℃以上のときだけRuleを起動したい場合は、
select * from [topic名] where temperature > 40
というふうになります。またSQL構文を組み合わせれば
SELECT CASE state.reported.sound > 10 WHEN true THEN true ELSE false END AS state.desired.roomOccupancy \
FROM '$aws/things/0123e254c646a02a01/shadow/update/accepted' \
WHERE state.reported.sound <> Null
といったように細かい処理も可能です。
「アクションの追加」を押すと、
以下のような画面に行きます。大まかな注意点としてはS3に直接データを格納しようとするとJSONファイルを書き換えてしまってうまく格納ができないということと、単純にSNSにプッシュしてアラート通知をしようとすると無味乾燥なJSONファイルがEメールやSMSに届いてしまうことでしょうか(S3に格納する場合はKinesis Firehoseを仲介させる必要があります。ちなみに米のIoTエンジニアStephen BorsayがAWSに聞いたところ、仕様だから我慢してくれと言われたそうです)。
#Lambdaと繋げてアラート通知をする
##パターン①IoT Core → Lambda
ひとまずLambdaに送ってしまえばおおよそのことができてしまいます。**↓のように初めから用意されている関数を使えば自分で調べてコードを書く時間の節約にもなります。**唯一の弱点はコールドスタートで、早い反応時間が必要な場合は対策が必要です(Lambdaを常時起動する or EC2等他のサービスを使う)。
※Lambdaの起動には最大で100秒かかる
SNSの代わりにLambdaを使うのは理由があります。**Rule→SNSだと基本的にメッセージのプッシュしかできないのでメッセージの編集ができません。**例えば火災報知器が{FireAlert: TRUE, ...}といったメッセージで火災をIoT Coreに通知してきたとして、そのJSON形式のままで警備員に送るのは現実的ではありません。そこでLambdaの登場というわけです。
例としてLINEAPIを叩いてLINEで通知をしたい場合のコードは以下のような感じになります。
import request
def lambda_handler(event, context):
payload = {'message' : '通知したいメッセージを打ち込む'}
#tokenにはLINE APIで発行したトークンを入れる
headers = {'Authorization' : 'Bearer' + 'token'}
requests.post('https://notify.api.line.me/api/notify', data = payload, headers = headers)
return
参考:ICHIKEN Engineering
##パターン②IoT Core → Lambda → SNS
SNSではメッセージのプッシュしかできないということで、SMSやEMAILあてに送る場合も一度Lambdaをかませる必要があります。Lambdaにはいくつかブループリント(設計図)が用意されており、SNSへの送信は「設計図」で「sns」検索で出てくるstep-functions-send-to-snsが使えます。一応以下にそのままコードを貼ります。コード内でいうmessage
がRuleがLambdaに渡したメッセージになります。そこでmessage['topic']
というように、辞書参照的に値を引いてきているというわけです。
from __future__ import print_function
import json
import urllib
import boto3
print('Loading message function...')
def send_to_sns(message, context):
# This function receives JSON input with three fields: the ARN of an SNS topic,
# a string with the subject of the message, and a string with the body of the message.
# The message is then sent to the SNS topic.
#
# Example:
# {
# "topic": "arn:aws:sns:REGION:123456789012:MySNSTopic",
# "subject": "This is the subject of the message.",
# "message": "This is the body of the message."
# }
sns = boto3.client('sns')
sns.publish(
TopicArn=message['topic'],
Subject=message['subject'],
Message=message['body']
)
return ('Sent a message to an Amazon SNS topic.')
#SNS, SQSと繋げて疎結合なシステムを作る
SNSは図のように複数サービスに対してメッセージをPub/Sub方式でプッシュするのに使うのに対して、SQSは1対1であることとポール方式が採用されていることが特徴です。
SNS→Lambdaの繋ぎの部分でエラーを出すまでの試行回数やエラー時の動作を定義できるので、ミッションクリティカルなLambdaはそこで処理ができそうです。一方で時間的に猶予があるLambdaなどはSQSにメッセージをためておいて定期的に対象となるサービスからポールしていくという設計がfault toleranceの点で良さそうです。
##パターン③IoT Core → SQS(→ Lambda, IoT Analytics etc.)およびパターン④IoT Core → SNS(→ SQS → Lambda, IoT Analytics etc.)
どちらもコンソールから簡単に繋げるので説明は省きますが、どちらも繋ぐ対象となる「キュー(Queue)」と「SNSターゲット」が必要です。

##パターン⑤IoT Core → IoT Core
例えばサーモスタットシステムを作るときに「温度計の温度データが30以上だったら、クーラーの電源を入れる」というルールを作りたいとします。その場合、例えばRuleのSQL条件文を
select * from '$aws/things/thermometer1/shadow/update' where temperature > 30
として、クーラーデバイスに対してシャドウ機能を使って{switch: on}
を送ってやればクーラーのONができます(シャドウ機能についてはこちらのAWS公式ドキュメントを参照)。注意点としては↓のように$aws
で始まるトピックに再パブリッシュする際に頭にもう一つ$
をつけなければいけないことです。
Device Shadow トピックのその他の使用では
$
記号は 1 つであるにもかかわらず、この場合は 2 つ使用している理由 ルールエンジンアクションは、任意で置換テンプレートをサポートするためです。置換テンプレートによって、実行時に評価される式を定義できます。置換テンプレートは${ YOUR_EXPRESSION_HERE }
表記を使用するため、Device Shadow トピックの$aws
プレフィックスと競合します。正しい Device Shadow トピックを再発行アクションで使用するには、最初の$
記号を避ける必要があるため、$$aws/things/<<CLIENT_ID>>/shadow/update
のようになります。
参考:
https://edukit.workshop.aws/jp/smart-thermostat/data-transforms-and-routing.html
#おわりに
今回の記事執筆に関して一番参考にしたのが以下のYoutubeチャンネルです。英語ですが、AWSの各サービスに関して他のどのリソースよりもわかりやすい説明がされており、かなりお勧めです。