はじめに
AWS IoT CoreにBasic Ingestの機能がリリースされていました。
データ送信だけをしたいユースケースで使うと、Message brokerに接続しないでルールエンジンを発火できるようです。
メリットは飛ばした、メッセージブローカーのコストが掛からないということで、データ送信のみを考えるユースケースで便利かと思います。
#使い方
topicの先頭に "$aws/rules/{実行したいrule名}を追加するだけです。
ルールエンジンの記述は、 $aws/rules/{rule名} を省略できます。
つまり、 data/{device_name}でデータを送信していた場合、
- 送信側のtopic
$aws/rules/{rule_name}/data/{device_name}
- ルールエンジンのSQL句の書き方
data/#
でよく、Device側でおくるtopic名のupdateができれば、AWS IoT側の設定変更不要です。
Basic Ingestトピックの最初の3つのレベル($ aws / rules / rule-name)は、トピックの8セグメント長制限または256文字合計制限の方にカウントされません。
また存在しないルール名をtopicに指定すると、RuleNotFoundのmetricが増える。
publicな開発ドキュメントはこちら
#テスト
Lambdaの設置
Basic Ingestが来たら、受けたメッセージを打ち返すLambdaを用意します。打ち返すtopicは"basic_ingest/got"とします。
roleはLambdabasicとAWS IoTへpublishできるroleが必要です。
from __future__ import print_function
import json
import boto3
iot = boto3.client('iot-data')
def lambda_handler(event, context):
try:
iot.publish(
topic = "basic_ingest/got",
qos = 0,
payload = json.dumps(event)
)
except Exception as e:
print("exception")
print(e.message)
return
AWS IoT Ruleエンジン
テストクライアント
classで隠蔽したPython Device SDKは省略しています。詳細はこちらを参照してください。
やっていることは単純に $aws/rules/basic_ingest(上で設定したルール名)/data/引数で指定したデバイス名
に無限ループでデータをpublishしています。
また、Shadowと同期していますが今回はあまり関係ないので無視してください。
github上の basicPubSub.pyであれば引数のtopicに使いたいtopicを指定することでほぼ同じことができます。
import sys
import time
import logging
import argparse
import IoT_common as IoT
import dependency as dc
BASIC_INGEST = "$aws/rules/basic_ingest/data/"
#-------------------------------------------------------------------------------
def init_log():
logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
logger.setLevel(logging.WARN)
logging.basicConfig()
return logger
#-------------------------------------------------------------------------------
def device_main():
try:
logger = init_log()
logger.info("start")
parser = argparse.ArgumentParser()
init_info = dc.arg_check(parser)
device_name = init_info['device_name']
iot = IoT.IoT_common(device_name,
init_info['endpoint'],
init_info['certs'])
iot.create_shadow()
status_str = dc.set_status(iot.WAIT_TIME)
iot.updateShadow(status_str)
iot.create_mqtt()
topic = BASIC_INGEST + device_name
while True:
payload = dc.create_data(device_name)
logger.debug("payload:{}".format(payload))
iot.PublishMsg(topic, payload)
time.sleep(iot.WAIT_TIME)
except Exception as e:
logger.error("Error main()")
logger.error("error msg:\n{}".format(e.message))
if __name__ == '__main__':
device_main()
でこちらのクライアントを起動します。
テスト確認
AWS IoTのテスト画面から # をsubscribeしてみます。
Basic ingestはルールエンジンに直接つながっているので、subscribeには表示されず、Lambdaが打ち返したメッセージのみが表示されています。
ということで、メッセージブローカーを通らないことでテストクライアントのメッセージはsubscriberで受信できずに、しかし、ルールエンジンは発火していることがわかりました。
#まとめ
ということで、deviceが使うtopicの変更のみでこの機能が使えることがわかりました。注意点としてはpolicyの運用を厳格にやっている方は、Resource管理でこのtopicを許すことをお忘れなく、といったあたりになります。
また、メッセージpublishをsubscriberにメッセージを届けたい場合は、本機能ではなく従来のtopicを使ったpub/subのメッセージブローカー経由で使う必要があります。
#免責
本投稿は、個人の意見で、所属する企業や団体は関係ありません。
また掲載しているsampleプログラムの動作に関しても保障いたしませんので、参考程度にしてください。