Edited at

AWS IoTのBasic ingestのテストと仕様を確認


はじめに

AWS IoT CoreにBasic Ingestの機能がリリースされていました。

データ送信だけをしたいユースケースで使うと、Message brokerに接続しないでルールエンジンを発火できるようです。

メリットは飛ばした、メッセージブローカーのコストが掛からないということで、データ送信のみを考えるユースケースで便利かと思います。


使い方

topicの先頭に "$aws/rules/{実行したいrule名}を追加するだけです。

ルールエンジンの記述は、 $aws/rules/{rule名} を省略できます。

つまり、 data/{device_name}でデータを送信していた場合、


  • 送信側のtopic

$aws/rules/{rule_name}/data/{device_name}


  • ルールエンジンのSQL句の書き方

data/#

でよく、Device側でおくるtopic名のupdateができれば、AWS IoT側の設定変更不要です。

Basic Ingestトピックの最初の3つのレベル($ aws / rules / rule-name)は、トピックの8セグメント長制限または256文字合計制限の方にカウントされません。

また存在しないルール名をtopicに指定すると、RuleNotFoundのmetricが増える。

publicな開発ドキュメントはこちら


テスト


Lambdaの設置

Basic Ingestが来たら、受けたメッセージを打ち返すLambdaを用意します。打ち返すtopicは"basic_ingest/got"とします。

roleはLambdabasicとAWS IoTへpublishできるroleが必要です。


just_publish.py

from __future__ import print_function

import json
import boto3

iot = boto3.client('iot-data')

def lambda_handler(event, context):
try:
iot.publish(
topic = "basic_ingest/got",
qos = 0,
payload = json.dumps(event)
)
except Exception as e:
print("exception")
print(e.message)

return



AWS IoT Ruleエンジン

以下のように設定しました。

スクリーンショット 2018-11-09 20.54.28.png


テストクライアント

classで隠蔽したPython Device SDKは省略しています。詳細はこちらを参照してください。

やっていることは単純に $aws/rules/basic_ingest(上で設定したルール名)/data/引数で指定したデバイス名

に無限ループでデータをpublishしています。

また、Shadowと同期していますが今回はあまり関係ないので無視してください。

github上の basicPubSub.pyであれば引数のtopicに使いたいtopicを指定することでほぼ同じことができます。


test_client.py

import sys

import time
import logging
import argparse
import IoT_common as IoT
import dependency as dc

BASIC_INGEST = "$aws/rules/basic_ingest/data/"
#-------------------------------------------------------------------------------
def init_log():
logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
logger.setLevel(logging.WARN)
logging.basicConfig()

return logger
#-------------------------------------------------------------------------------
def device_main():
try:
logger = init_log()
logger.info("start")
parser = argparse.ArgumentParser()
init_info = dc.arg_check(parser)
device_name = init_info['device_name']
iot = IoT.IoT_common(device_name,
init_info['endpoint'],
init_info['certs'])
iot.create_shadow()
status_str = dc.set_status(iot.WAIT_TIME)
iot.updateShadow(status_str)
iot.create_mqtt()
topic = BASIC_INGEST + device_name
while True:
payload = dc.create_data(device_name)
logger.debug("payload:{}".format(payload))
iot.PublishMsg(topic, payload)
time.sleep(iot.WAIT_TIME)
except Exception as e:
logger.error("Error main()")
logger.error("error msg:\n{}".format(e.message))

if __name__ == '__main__':
device_main()


でこちらのクライアントを起動します。


テスト確認

AWS IoTのテスト画面から # をsubscribeしてみます。

Basic ingestはルールエンジンに直接つながっているので、subscribeには表示されず、Lambdaが打ち返したメッセージのみが表示されています。

スクリーンショット 2018-11-09 21.04.40.png

ということで、メッセージブローカーを通らないことでテストクライアントのメッセージはsubscriberで受信できずに、しかし、ルールエンジンは発火していることがわかりました。


まとめ

ということで、deviceが使うtopicの変更のみでこの機能が使えることがわかりました。注意点としてはpolicyの運用を厳格にやっている方は、Resource管理でこのtopicを許すことをお忘れなく、といったあたりになります。

また、メッセージpublishをsubscriberにメッセージを届けたい場合は、本機能ではなく従来のtopicを使ったpub/subのメッセージブローカー経由で使う必要があります。


免責

本投稿は、個人の意見で、所属する企業や団体は関係ありません。

また掲載しているsampleプログラムの動作に関しても保障いたしませんので、参考程度にしてください。