AWS
awsIoT

AWS IoTのBasic ingestのテストと仕様を確認

はじめに

AWS IoT CoreにBasic Ingestの機能がリリースされていました。
データ送信だけをしたいユースケースで使うと、Message brokerに接続しないでルールエンジンを発火できるようです。
メリットは飛ばした、メッセージブローカーのコストが掛からないということで、データ送信のみを考えるユースケースで便利かと思います。

使い方

topicの先頭に "$aws/rules/{実行したいrule名}を追加するだけです。
ルールエンジンの記述は、 $aws/rules/{rule名} を省略できます。
つまり、 data/{device_name}でデータを送信していた場合、

  • 送信側のtopic
$aws/rules/{rule_name}/data/{device_name}
  • ルールエンジンのSQL句の書き方
data/#

でよく、Device側でおくるtopic名のupdateができれば、AWS IoT側の設定変更不要です。
Basic Ingestトピックの最初の3つのレベル($ aws / rules / rule-name)は、トピックの8セグメント長制限または256文字合計制限の方にカウントされません。
また存在しないルール名をtopicに指定すると、RuleNotFoundのmetricが増える。

publicな開発ドキュメントはこちら

テスト

Lambdaの設置

Basic Ingestが来たら、受けたメッセージを打ち返すLambdaを用意します。打ち返すtopicは"basic_ingest/got"とします。
roleはLambdabasicとAWS IoTへpublishできるroleが必要です。

just_publish.py
from __future__ import print_function
import json
import boto3

iot = boto3.client('iot-data')

def lambda_handler(event, context):
  try:
    iot.publish(
        topic = "basic_ingest/got",
        qos = 0,
        payload = json.dumps(event)
    )
  except Exception as e:
    print("exception")
    print("e.message")

  return

AWS IoT Ruleエンジン

以下のように設定しました。
スクリーンショット 2018-11-09 20.54.28.png

テストクライアント

classで隠蔽したPython Device SDKは省略しています。詳細はこちらを参照してください。
やっていることは単純に $aws/rules/basic_ingest(上で設定したルール名)/data/引数で指定したデバイス名
に無限ループでデータをpublishしています。
また、Shadowと同期していますが今回はあまり関係ないので無視してください。
github上の basicPubSub.pyであれば引数のtopicに使いたいtopicを指定することでほぼ同じことができます。

test_client.py
import sys
import time
import logging
import argparse
import IoT_common as IoT
import dependency as dc


BASIC_INGEST = "$aws/rules/basic_ingest/data/"
#-------------------------------------------------------------------------------
def init_log():
    logger = logging.getLogger()
    handler = logging.StreamHandler(sys.stdout)
    logger.addHandler(handler)
    logger.setLevel(logging.WARN)
    logging.basicConfig()

    return logger
#-------------------------------------------------------------------------------
def device_main():
    try:
        logger = init_log()
        logger.info("start")
        parser = argparse.ArgumentParser()
        init_info = dc.arg_check(parser)
        device_name = init_info['device_name']
        iot = IoT.IoT_common(device_name,
                             init_info['endpoint'],
                             init_info['certs'])
        iot.create_shadow()
        status_str = dc.set_status(iot.WAIT_TIME)
        iot.updateShadow(status_str)
        iot.create_mqtt()
        topic = BASIC_INGEST + device_name
        while True:
            payload = dc.create_data(device_name)
            logger.debug("payload:{}".format(payload))
            iot.PublishMsg(topic, payload)
            time.sleep(iot.WAIT_TIME)
    except Exception as e:
        logger.error("Error main()")
        logger.error("error msg:\n{}".format(e.message))

if __name__ == '__main__':
    device_main()

でこちらのクライアントを起動します。

テスト確認

AWS IoTのテスト画面から # をsubscribeしてみます。
Basic ingestはルールエンジンに直接つながっているので、subscribeには表示されず、Lambdaが打ち返したメッセージのみが表示されています。
スクリーンショット 2018-11-09 21.04.40.png

ということで、メッセージブローカーを通らないことでテストクライアントのメッセージはsubscriberで受信できずに、しかし、ルールエンジンは発火していることがわかりました。

まとめ

ということで、deviceが使うtopicの変更のみでこの機能が使えることがわかりました。注意点としてはpolicyの運用を厳格にやっている方は、Resource管理でこのtopicを許すことをお忘れなく、といったあたりになります。
また、メッセージpublishをsubscriberにメッセージを届けたい場合は、本機能ではなく従来のtopicを使ったpub/subのメッセージブローカー経由で使う必要があります。

免責

本投稿は、個人の意見で、所属する企業や団体は関係ありません。
また掲載しているsampleプログラムの動作に関しても保障いたしませんので、参考程度にしてください。