LoginSignup
8
8

More than 3 years have passed since last update.

AWS IoTのBasic ingestのテストと仕様を確認

Last updated at Posted at 2018-11-09

はじめに

AWS IoT CoreにBasic Ingestの機能がリリースされていました。
データ送信だけをしたいユースケースで使うと、Message brokerに接続しないでルールエンジンを発火できるようです。
メリットは飛ばした、メッセージブローカーのコストが掛からないということで、データ送信のみを考えるユースケースで便利かと思います。

使い方

topicの先頭に "$aws/rules/{実行したいrule名}を追加するだけです。
ルールエンジンの記述は、 $aws/rules/{rule名} を省略できます。
つまり、 data/{device_name}でデータを送信していた場合、

  • 送信側のtopic
$aws/rules/{rule_name}/data/{device_name}
  • ルールエンジンのSQL句の書き方
data/#

でよく、Device側でおくるtopic名のupdateができれば、AWS IoT側の設定変更不要です。
Basic Ingestトピックの最初の3つのレベル($ aws / rules / rule-name)は、トピックの8セグメント長制限または256文字合計制限の方にカウントされません。
また存在しないルール名をtopicに指定すると、RuleNotFoundのmetricが増える。

publicな開発ドキュメントはこちら

テスト

Lambdaの設置

Basic Ingestが来たら、受けたメッセージを打ち返すLambdaを用意します。打ち返すtopicは"basic_ingest/got"とします。
roleはLambdabasicとAWS IoTへpublishできるroleが必要です。

just_publish.py
from __future__ import print_function
import json
import boto3

iot = boto3.client('iot-data')

def lambda_handler(event, context):
  try:
    iot.publish(
        topic = "basic_ingest/got",
        qos = 0,
        payload = json.dumps(event)
    )
  except Exception as e:
    print("exception")
    print(e.message)

  return

AWS IoT Ruleエンジン

以下のように設定しました。
スクリーンショット 2018-11-09 20.54.28.png

テストクライアント

classで隠蔽したPython Device SDKは省略しています。詳細はこちらを参照してください。
やっていることは単純に $aws/rules/basic_ingest(上で設定したルール名)/data/引数で指定したデバイス名
に無限ループでデータをpublishしています。
また、Shadowと同期していますが今回はあまり関係ないので無視してください。
github上の basicPubSub.pyであれば引数のtopicに使いたいtopicを指定することでほぼ同じことができます。

test_client.py
import sys
import time
import logging
import argparse
import IoT_common as IoT
import dependency as dc


BASIC_INGEST = "$aws/rules/basic_ingest/data/"
#-------------------------------------------------------------------------------
def init_log():
    logger = logging.getLogger()
    handler = logging.StreamHandler(sys.stdout)
    logger.addHandler(handler)
    logger.setLevel(logging.WARN)
    logging.basicConfig()

    return logger
#-------------------------------------------------------------------------------
def device_main():
    try:
        logger = init_log()
        logger.info("start")
        parser = argparse.ArgumentParser()
        init_info = dc.arg_check(parser)
        device_name = init_info['device_name']
        iot = IoT.IoT_common(device_name,
                             init_info['endpoint'],
                             init_info['certs'])
        iot.create_shadow()
        status_str = dc.set_status(iot.WAIT_TIME)
        iot.updateShadow(status_str)
        iot.create_mqtt()
        topic = BASIC_INGEST + device_name
        while True:
            payload = dc.create_data(device_name)
            logger.debug("payload:{}".format(payload))
            iot.PublishMsg(topic, payload)
            time.sleep(iot.WAIT_TIME)
    except Exception as e:
        logger.error("Error main()")
        logger.error("error msg:\n{}".format(e.message))

if __name__ == '__main__':
    device_main()

でこちらのクライアントを起動します。

テスト確認

AWS IoTのテスト画面から # をsubscribeしてみます。
Basic ingestはルールエンジンに直接つながっているので、subscribeには表示されず、Lambdaが打ち返したメッセージのみが表示されています。
スクリーンショット 2018-11-09 21.04.40.png

ということで、メッセージブローカーを通らないことでテストクライアントのメッセージはsubscriberで受信できずに、しかし、ルールエンジンは発火していることがわかりました。

まとめ

ということで、deviceが使うtopicの変更のみでこの機能が使えることがわかりました。注意点としてはpolicyの運用を厳格にやっている方は、Resource管理でこのtopicを許すことをお忘れなく、といったあたりになります。
また、メッセージpublishをsubscriberにメッセージを届けたい場合は、本機能ではなく従来のtopicを使ったpub/subのメッセージブローカー経由で使う必要があります。

免責

本投稿は、個人の意見で、所属する企業や団体は関係ありません。
また掲載しているsampleプログラムの動作に関しても保障いたしませんので、参考程度にしてください。

8
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
8