IoT 始めたばかり人向けの記事です。
AWS IoT Coreのルール機能で、デバイスから送信したデータをいくつかのストア先に溜めてみました。
ルール機能の概要やストア方法をまとめてみました。
#前提
- AWS IOT CORE 初級ハンズオンを参考にしています。
- 送信元デバイスは NVIDIA Jetson Nano 開発者キット B01 を利用
- AWS IoT Core のコンパネで「モノ」を作成して、「モノ」用の証明書も作成してデバイスに保存
- デバイスにはAWS IoT Device SDK をインストール
- クライアントプログラムはAWS IOT CORE 初級ハンズオンのダミーデバイス用プログラム(
device_main.py)を利用 - デバイスからIoT Coreに向けてMQTTで10秒おきにデータ送信
- データはこんな感じ
{
"DEVICE_NAME": "jetson-AN0001",
"TIMESTAMP": "2021-03-17T22:03:48",
"TEMPERATURE": 18,
"HUMIDITY": 57
}
#1. AWS IoT Core ルールでできる事
AWS IoT Coreのルール機能を使うと、MQTTで受け取ったデータを他のAWS マネージドサービスへ比較的簡単に連携できます。
##1-1.アクション
連携先は「アクション」としてあらかじめ用意されたものから選択できます。
####アクションで選択できるもののイメージ(2021年3月現在)
##1-2.ルールクエリステートメント
AWS IoT Coreのルール機能では、AWS IoTのメッセージブローカーからデータをサブスクライブする際のフィルターを簡単に書ける機能もあります。「ルールクエリステートメント」がそれにあたります。SQLライクなクエリを書くことでフィルターすることができます。
ルールクエリステートメント のイメージ
select * from [topic名] where temperature > 50
のように記載することで
[topic名]宛に送信されたもののうち、temperature が 50より大きいものだけを「アクション」で定義した先に送る事が出来ます。
##1-3.エラーアクション
ルール処理中にエラーが発生した場合、エラーアクションの先にデータを送ることも可能です。
通常のアクションと同じ候補から選べますので、何かあったらとりあえずCloud Watchに流しとく
みたいな使い方も可能です。
エラーアクションのイメージ
#2. AWS IoT Core ルールでやってみた事
AWS IOT CORE 初級ハンズオンで試せるものもやってみつつ、他にアクションで試してみたいものをチョイスしてデータを入れてみました。
##2-1.データのストア先と使ったルール(アクション)
# | ストア先 | 使ったルール(アクション) | 途中経由したサービス |
---|---|---|---|
1 | S3 | Amazon Kinesis Firehose ストリームにメッセージを送信する | Amazon Kinesis Firehose |
2 | DynamoDB | Amazon Kinesis ストリームにメッセージを送信する | Amazon Kinesis Data Streams,Lambda |
3 | timestream | Timestreamテーブルにメッセージを書き込む | なし |
4 | Elasticsearch | Amazon Elasticsearch Serviceにメッセージを送信する | なし |
5 | AWS IoT Analytics | IoT Analyticsにメッセージを送る | IoT Analytics内でchannel,pipeline,storageを経由 |
では、実際どういったアクション設定をしたかを見ていきます。
##2-2.ルール アクションの設定
###2-2-1. アクション:Amazon Kinesis Firehose ストリームにメッセージを送信する でS3に保存する
まずS3に保存するパターンです。アクション「S3バケットにメッセージを格納する」で直接書く事もできるにはできるのですが、メッセージをまとめたり圧縮したりしてS3に入れてくれるAmazon Kinesis FirehoseのアクションでS3に入れてみました。
####① S3バケットの作成
まず今回のデータを入れるS3バケットを作ります。
バケット名を入力してリージョン選んだら今回はそのままの設定で作ります。
####② Amazon Kinesis Firehose の設定
次にAmazon Kinesis Firehose を設定します。Kinesis の画面で「配信ストリームの作成」をクリックして
配信ストリームに名前を付けます。ここではiot-storage-test とつけました。
Data Transform (あらかじめ作ったlambdaを指定する)やRecord Format conversion(Apache Parquet などの機能へ連携)などETLのTする機能を設定する事もできるようですが今回はスキップします。
次に① S3バケットの作成で作ったS3バケットを選んでNEXT画面に行きます。
次にバッファサイズとバッファインターバルを設定します。このバッファサイズを超える量まで溜め込むか、インターバルの時間を超えると、溜めてたものを1ファイルにまとめてS3に吐きだしてくれます。ここでは5MBと60secondsに設定してみました。
他、圧縮方式、暗号化の有無、エラーログをCloud Watchに送るか などを設定します。
最後にIAMロールを作るか選んであげて完了です。※IAMロールの説明は今回は除きます。
####③ IoT Core ルール アクションの設定
ここまでできたらIoT Core の ルール アクションを設定していきます。
左メニュー「ACT」の下にあるルールをクリックし、画面右上の「作成」をクリックします。
小ネタなんですが、このルール名、-(ハイフン)を受け付けず、_(アンダースコア)はOKなんです。ここまで聞くと普通に聞こえるかもしれませんが、AWSの他のサービスで名前つけようとすると全く逆だったりするんです。なぜここだけハイフンNGでアンスコOKにしたのか問いただしたい。知ってる人がいたら教えてほしいです。
黒いコンソールみたいなところにこんなSQLを書いてあげるだけです。フィルターしたい場合はwhere TEMPERATURE > 50
のようなWhere句を後ろにつけます。
select * from 'data/jetson-AN0001'
今回使ったプログラムはメッセージのTopicを 「data/[デバイス名]」として送っているのでこのようなクエリになっています。
「Amazon Kinesis Firehose ストリームにメッセージを送信する」を選択して一番下にある「アクションの設定」をクリックします。
「Amazon Kinesis Firehose ストリームにメッセージを送信する」の設定画面がでてくるので、② Amazon Kinesis Firehose の設定で設定した名前をストリーム名で選択します。Kinesis Firehoseはデータをバッファしてまとめてくれますが、データを何で区切るかをセパレータで指定します。今回は「\n (改行)」を選びました。
「バッチモード」はリクエスト内に複数のデータを埋め込んで送るPutRecordBatch用のオプションなので今回はチェックしません。最後に適切なIAMロールを選ぶか、ここで作成して設定完了です。
####④ データを送って確認してみる。
AWS IOT CORE 初級ハンズオンのダミーデバイス用プログラム(device_main.py)でAWS IoT Coreに向けてデータを送ってみます。
実際にメッセージがIoT Coreに届いているかはIoT Coreのメニュー「テスト」で簡単に確認できます。
この画面で「トピックのフィルター」に送信したトピック名を入力(上画像の例ではdata/jetson-AN0001)して、サブスクライブボタンを押すだけです。
メッセージがそのトピック宛に届いていれば、画面下部にリアルタイムに表示されます。
※編集の都合上トピック名を修正しています。
ここでもう一つ小ネタです。一度作った「モノの名前」は、名前の編集といった形では修正できません。もし変えたい場合、新しい名前でモノを作って置き換える必要があります。
作成後にモノの名前は変更できません。モノの名前を変更するには、新しいモノを作成して、新しい名前を付け、古いモノを削除する必要があります。
実際に運用する場合、モノの命名ルールは最初にちゃんと決めたほうが良さそうですね。
では、最終目的S3にデータが入ったか見てみます。自動で年/月/日/時 でフォルダが切られています。
ファイルをダウンロードして中を見ると、送信したデータがまとめられて保存されているのが分かりました。
{"DEVICE_NAME": "jetson-sumida-AN0001", "TIMESTAMP": "2021-03-18T23:22:55", "TEMPERATURE": 24, "HUMIDITY": 52}
{"DEVICE_NAME": "jetson-sumida-AN0001", "TIMESTAMP": "2021-03-18T23:23:25", "TEMPERATURE": 18, "HUMIDITY": 57}
{"DEVICE_NAME": "jetson-sumida-AN0001", "TIMESTAMP": "2021-03-18T23:23:55", "TEMPERATURE": 15, "HUMIDITY": 58}
S3への送信確認はこれで終わりです。
####⑤ 今回のまとめ(IoT Core → ルール → kinesisFirehose → S3)
ルールのアクションとサービスを設定するだけで簡単にS3に入れる事ができました。
クライアントプログラムを除けば、コードはとても簡単なクエリのみです。
ルールのアクション機能を活用すると簡単にサービス連携ができそうです。
記事が長くなったので
アクション「Amazon Kinesis ストリームにメッセージを送信する」でDynamoDBに保存する
を次回書きます。