4
3

More than 3 years have passed since last update.

【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その1)

Last updated at Posted at 2021-03-18

IoT 始めたばかり人向けの記事です。
AWS IoT Coreのルール機能で、デバイスから送信したデータをいくつかのストア先に溜めてみました。
ルール機能の概要やストア方法をまとめてみました。

前提

  • AWS IOT CORE 初級ハンズオンを参考にしています。
  • 送信元デバイスは NVIDIA Jetson Nano 開発者キット B01 を利用
  • AWS IoT Core のコンパネで「モノ」を作成して、「モノ」用の証明書も作成してデバイスに保存
  • デバイスにはAWS IoT Device SDK をインストール
  • クライアントプログラムはAWS IOT CORE 初級ハンズオンのダミーデバイス用プログラム( device_main.py)を利用
  • デバイスからIoT Coreに向けてMQTTで10秒おきにデータ送信
  • データはこんな感じ
{
  "DEVICE_NAME": "jetson-AN0001",
  "TIMESTAMP": "2021-03-17T22:03:48",
  "TEMPERATURE": 18,
  "HUMIDITY": 57
}

1. AWS IoT Core ルールでできる事

AWS IoT Coreのルール機能を使うと、MQTTで受け取ったデータを他のAWS マネージドサービスへ比較的簡単に連携できます。

1-1.アクション

連携先は「アクション」としてあらかじめ用意されたものから選択できます。

アクションで選択できるもののイメージ(2021年3月現在)

image.png
image.png

1-2.ルールクエリステートメント

AWS IoT Coreのルール機能では、AWS IoTのメッセージブローカーからデータをサブスクライブする際のフィルターを簡単に書ける機能もあります。「ルールクエリステートメント」がそれにあたります。SQLライクなクエリを書くことでフィルターすることができます。

ルールクエリステートメント のイメージ

image.png
例えば

 select * from [topic] where temperature > 50

のように記載することで
[topic名]宛に送信されたもののうち、temperature が 50より大きいものだけを「アクション」で定義した先に送る事が出来ます。

1-3.エラーアクション

ルール処理中にエラーが発生した場合、エラーアクションの先にデータを送ることも可能です。
通常のアクションと同じ候補から選べますので、何かあったらとりあえずCloud Watchに流しとく
みたいな使い方も可能です。

エラーアクションのイメージ

image.png

2. AWS IoT Core ルールでやってみた事

AWS IOT CORE 初級ハンズオンで試せるものもやってみつつ、他にアクションで試してみたいものをチョイスしてデータを入れてみました。

2-1.データのストア先と使ったルール(アクション)

# ストア先 使ったルール(アクション) 途中経由したサービス
1 S3 Amazon Kinesis Firehose ストリームにメッセージを送信する Amazon Kinesis Firehose
2 DynamoDB Amazon Kinesis ストリームにメッセージを送信する Amazon Kinesis Data Streams,Lambda
3 timestream Timestreamテーブルにメッセージを書き込む なし
4 Elasticsearch Amazon Elasticsearch Serviceにメッセージを送信する なし
5 AWS IoT Analytics IoT Analyticsにメッセージを送る IoT Analytics内でchannel,pipeline,storageを経由

では、実際どういったアクション設定をしたかを見ていきます。

2-2.ルール アクションの設定

2-2-1. アクション:Amazon Kinesis Firehose ストリームにメッセージを送信する でS3に保存する

まずS3に保存するパターンです。アクション「S3バケットにメッセージを格納する」で直接書く事もできるにはできるのですが、メッセージをまとめたり圧縮したりしてS3に入れてくれるAmazon Kinesis FirehoseのアクションでS3に入れてみました。

① S3バケットの作成

まず今回のデータを入れるS3バケットを作ります。
バケット名を入力してリージョン選んだら今回はそのままの設定で作ります。
image.png

② Amazon Kinesis Firehose の設定

次にAmazon Kinesis Firehose を設定します。Kinesis の画面で「配信ストリームの作成」をクリックして
image.png

配信ストリームに名前を付けます。ここではiot-storage-test とつけました。
image.png

Data Transform (あらかじめ作ったlambdaを指定する)やRecord Format conversion(Apache Parquet などの機能へ連携)などETLのTする機能を設定する事もできるようですが今回はスキップします。

Step3で保存先を選択できますのでここでS3を選びます。
image.png

次に① S3バケットの作成で作ったS3バケットを選んでNEXT画面に行きます。
image.png

次にバッファサイズとバッファインターバルを設定します。このバッファサイズを超える量まで溜め込むか、インターバルの時間を超えると、溜めてたものを1ファイルにまとめてS3に吐きだしてくれます。ここでは5MBと60secondsに設定してみました。
image.png

他、圧縮方式、暗号化の有無、エラーログをCloud Watchに送るか などを設定します。
image.png

最後にIAMロールを作るか選んであげて完了です。※IAMロールの説明は今回は除きます。

③ IoT Core ルール アクションの設定

ここまでできたらIoT Core の ルール アクションを設定していきます。
左メニュー「ACT」の下にあるルールをクリックし、画面右上の「作成」をクリックします。
image.png

ルールの作成画面が出てくるのでルールに名前を付けます。
image.png

小ネタなんですが、このルール名、-(ハイフン)を受け付けず、_(アンダースコア)はOKなんです。ここまで聞くと普通に聞こえるかもしれませんが、AWSの他のサービスで名前つけようとすると全く逆だったりするんです。なぜここだけハイフンNGでアンスコOKにしたのか問いただしたい。知ってる人がいたら教えてほしいです。

次にルールクエリステートメントを設定します。
image.png

黒いコンソールみたいなところにこんなSQLを書いてあげるだけです。フィルターしたい場合はwhere TEMPERATURE > 50 のようなWhere句を後ろにつけます。

 select * from 'data/jetson-AN0001'

今回使ったプログラムはメッセージのTopicを 「data/[デバイス名]」として送っているのでこのようなクエリになっています。

次に「アクションの追加」をクリックして、
image.png

「Amazon Kinesis Firehose ストリームにメッセージを送信する」を選択して一番下にある「アクションの設定」をクリックします。
image.png

「Amazon Kinesis Firehose ストリームにメッセージを送信する」の設定画面がでてくるので、② Amazon Kinesis Firehose の設定で設定した名前をストリーム名で選択します。Kinesis Firehoseはデータをバッファしてまとめてくれますが、データを何で区切るかをセパレータで指定します。今回は「\n (改行)」を選びました。

image.png

「バッチモード」はリクエスト内に複数のデータを埋め込んで送るPutRecordBatch用のオプションなので今回はチェックしません。最後に適切なIAMロールを選ぶか、ここで作成して設定完了です。

④ データを送って確認してみる。

AWS IOT CORE 初級ハンズオンのダミーデバイス用プログラム(device_main.py)でAWS IoT Coreに向けてデータを送ってみます。

こんな感じで実行されます。
image.png

実際にメッセージがIoT Coreに届いているかはIoT Coreのメニュー「テスト」で簡単に確認できます。
image.png
この画面で「トピックのフィルター」に送信したトピック名を入力(上画像の例ではdata/jetson-AN0001)して、サブスクライブボタンを押すだけです。
メッセージがそのトピック宛に届いていれば、画面下部にリアルタイムに表示されます。
image.png
※編集の都合上トピック名を修正しています。

ここでもう一つ小ネタです。一度作った「モノの名前」は、名前の編集といった形では修正できません。もし変えたい場合、新しい名前でモノを作って置き換える必要があります。

作成後にモノの名前は変更できません。モノの名前を変更するには、新しいモノを作成して、新しい名前を付け、古いモノを削除する必要があります。

実際に運用する場合、モノの命名ルールは最初にちゃんと決めたほうが良さそうですね。

では、最終目的S3にデータが入ったか見てみます。自動で年/月/日/時 でフォルダが切られています。


  • image.png


  • image.png


  • image.png

  • 時間
    image.png

  • ファイル
    image.png

ファイルをダウンロードして中を見ると、送信したデータがまとめられて保存されているのが分かりました。

{"DEVICE_NAME": "jetson-sumida-AN0001", "TIMESTAMP": "2021-03-18T23:22:55", "TEMPERATURE": 24, "HUMIDITY": 52}
{"DEVICE_NAME": "jetson-sumida-AN0001", "TIMESTAMP": "2021-03-18T23:23:25", "TEMPERATURE": 18, "HUMIDITY": 57}
{"DEVICE_NAME": "jetson-sumida-AN0001", "TIMESTAMP": "2021-03-18T23:23:55", "TEMPERATURE": 15, "HUMIDITY": 58}

S3への送信確認はこれで終わりです。

⑤ 今回のまとめ(IoT Core → ルール → kinesisFirehose → S3)

ルールのアクションとサービスを設定するだけで簡単にS3に入れる事ができました。
クライアントプログラムを除けば、コードはとても簡単なクエリのみです。
ルールのアクション機能を活用すると簡単にサービス連携ができそうです。

記事が長くなったので
アクション「Amazon Kinesis ストリームにメッセージを送信する」でDynamoDBに保存する
を次回書きます。

追記:次回分リンク
【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その2)

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3