前回 【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その2)
の続きです。
第1回目はこちら 【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その1)
#前提
- AWS IOT CORE 初級ハンズオンを参考にしています。
- 送信元デバイスは NVIDIA Jetson Nano 開発者キット B01 を利用
- AWS IoT Core のコンパネで「モノ」を作成して、「モノ」用の証明書も作成してデバイスに保存
- デバイスにはAWS IoT Device SDK をインストール
- クライアントプログラムはAWS IOT CORE 初級ハンズオンのダミーデバイス用プログラム(
device_main.py)を利用 - デバイスからIoT Coreに向けてMQTTで30秒おきにデータ送信
- データはこんな感じ
{
"DEVICE_NAME": "jetson-nano-tsumida",
"TIMESTAMP": "2021-03-17T22:03:48",
"TEMPERATURE": 18,
"HUMIDITY": 57
}
##2-1.データのストア先と使ったルール(アクション)
※試してみたいものをチョイスしたリスト
# | ストア先 | 使ったルール(アクション) | 途中経由したサービス |
---|---|---|---|
1 | S3 | Amazon Kinesis Firehose ストリームにメッセージを送信する | Amazon Kinesis Firehose |
2 | DynamoDB | Amazon Kinesis ストリームにメッセージを送信する | Amazon Kinesis Data Streams,Lambda |
3 | timestream | Timestreamテーブルにメッセージを書き込む | なし |
4 | Elasticsearch | Amazon Elasticsearch Serviceにメッセージを送信する | なし |
5 | AWS IoT Analytics | IoT Analyticsにメッセージを送る | IoT Analytics内でchannel,pipeline,storageを経由 |
今回はこの3 「Timestreamテーブルにメッセージを書き込む」をやっていきます。
###2-2-3. 「Timestreamテーブルにメッセージを書き込む」
Amazon Timestreamは時系列データを保存する事に特化したマネージドサービスです。
公式から引用
Amazon Timestream は、IoT および運用アプリケーションに適した、高速かつスケーラブルなサーバーレス時系列データベースサービスです。リレーショナルデータベースの最大 1,000 倍の速度と 10 分の 1 のコストで、1 日あたり数兆ものイベントを、簡単に保存し、分析できます。
2020年9月30日GAになっています。
米国東部 (バージニア北部)、米国東部 (オハイオ)、米国西部 (オレゴン)、および欧州 (アイルランド) で利用可能
東京リージョンでアクセスしてみましたが、2021/06/29時点でまだでした。
TimestreamはAWS IOT CORE 初級ハンズオン では対象になっていませんが、ちょっと試してみたいのでやってみました。リージョンは米国東部 (オハイオ) で試しています。
####① Amazon Timestream データベースの作成
Amazon Timestream のコンソール画面で「Create database」を押す
- Standard Database を選択、データベース名を入力(今回は
jetson-nano-tsumida
と入れました。) - Encryptionの個所では暗号化キーを選びます。
KMSからキーを選ぶことになりますが、ここで何も入れないとKMSのエイリアス aws/timestream を使用してキーが作成されます。
今回はそのままで行きます。(一度登録したことがあるので選択された状態になっていますが、初回は空白)
※Amazon timestream は標準ですべてのデータが暗号化されます。
入力後、画面下部のCreate Databaseを押します。
DBはすぐできます。
次にテーブルを作ります。
####② Amazon Timestream テーブルの作成
作成したデータベースのリンクをクリックするとこの画面に遷移します。
画面中央のTable タブをクリックしてCreate Table を押します。
- テーブル作成画面でまずテーブル名を入れます。(今回は
jetson-nano-tsumida-ts-table
と入れました。) - Data retention を設定します。
Timestreamにデータを送ると最初はメモリストアに乗ります、Memory store retention
はメモリストアにどれくらい残しておくか を設定します。Magnetic store retention
はメモリから永続ストアに移した後の保存期間 を設定します。
Memory store retention
についての補足として、設定した期間より古いデータは取込めなくなるので要注意です。
例:Memory store retention を1h にした場合、15:30 に 14:29 のタイムスタンプデータを投入しようとしても投入できない。
今回は
Memory store retention
: 12Hour(s)
Magnetic store retention
: 1Month(s)
で設定してみました。
入力後「Create Table」を押すとテーブルが作成されます
####③ IoT Core ルール アクションの設定
前回と同様にルールとアクションを設定していきます。IoT Core IoT Core 左メニュー「ACT」の下にあるルールをクリックし、画面右上の「作成」をクリックします。
ルールの作成画面が出てくるのでルールに名前を付け、クエリを書きます。
ルール名はjetson_nano_tsumida_ts
としました。クエリは以下です。
select TEMPERATURE,HUMIDITY from 'data/jetson-nano-tsumida'
次に「アクションの追加」をクリックして、
「Timestreamテーブルにメッセージを書き込む」を選択して「アクションの設定」をクリックします。
アクションの設定画面で
- データベース名とテーブル名
- ディメンジョン名とディメンジョン値
を設定します。
データベース名は [① Amazon Timestream データベースの作成](#① Amazon Timestream データベースの作成)で作成したもの
テーブル名は ② Amazon Timestream テーブルの作成で作成したものを入れます。
ディメンジョン名は、データの固有ラベルのようなもので、ここで定義したディメンジョン名の列がtimestreamのデータに追加され、ディメンション値がすべてのデータに付与されます。
今回はdevice_id
と ${clientId()}
と入れました。clientId()は送信デバイスのIDを自動で設定してくれます。
次に
- タイムスタンプの値と単位
を設定します。
今回は${timestamp()}
と MILLISECONDS
と入れました。
${timestamp()} はデータを受け取った時刻をエポック秒で入れてくれます。データ形式はエポック秒である必要があります。
Timestamp設定でハマったポイント
どうせなら「送信時にデバイスで付与した時間」で格納しようと思い。
(送信データにはこんな感じでTimestampが入っている→ "TIMESTAMP": "2021-03-17T22:03:48"
)
この「送信データのtimestamp」を「TimestreamのTIMESTAMP値」として格納するには、エポックタイムへの変換が必要です。
IoT Coreのルールクエリステートメントで変換できそうなのでやってみました。
select time_to_epoch(TIMESTAMP, "yyyy-MM-dd'T'HH:mm:ss") AS EPOCH_TIME,TEMPERATURE,HUMIDITY from 'data/jetson-nano-tsumida'
これでペイロードのtimestampはエポックタイムに変換されます。
「Timestream のアクション設定」の「タイムスタンプ値」に変換後の値 ${EPOCH_TIME}
を設定して動かしてみました。
でもうまく動きませんでした。(「タイムスタンプ値」が''で飛んでいた)
S3にデータを送って確認すると
{"EPOCH_TIME":1624991287000,"TEMPERATURE":16,"HUMIDITY":40}
となっているので、変換は問題なさそうです。
色々検索した結果、どうやら「Timestreamのアクション」ではルールクエリステートメントで変換したものは認識されないようです。
「変換したもの」とは例えばFunctionなどを使って変換したものです。変換したものはAS でaliasの列名を定義してたりします。
この変換したものが「timestreamのアクション」には渡っておらず、IoT Coreが受け取った送信ペイロードの内容をそのままを使ってしまうようです。
厳密にいうと、「Timestreamのアクション」で簡単な演算(例えば四則演算など)はできそうです。
参考 ディスカッションフォーラムのtimestreamスレッド
TimestreamアクションでIoT Coreから直接連携する場合、payloadのtimestampや各valueなどは最終的に入れたい形で送る必要がありそうです。(lambdaなどを経由すればなんでもできるとは思います。)
今回は送信データをエポックタイムにする事はせず、timestamp値を ${timestamp()}
(IoT Coreで処理時に取得した時間)でやりました。(そちらも試したかったという事もあり)
設定続き
- 最後にロールを作ります。
ここでロールの作成を押しロールの名前を付けます。
今回はjetson-nano-tsumdia-ts-action
とつけました。
これでアクションの作成、ルールの作成を順に押してルールアクションの設定完了です。
####④ データを確認する
Amazon timestream のコンソールを開き、データベース名をクリック、テーブル名をクリック してこの画面を開きます。
画面右上の「Query table」を押し、表示されたSQLをそのまま実行します。
このようにHUMIDITYとTEMPERATUREが格納されているのが見れました。
####⑤ 今回のまとめ(IoT Core → ルール → Timestream)
IoT Core に届いたデータを直接timestreamに入れてみました。
設定自体は簡単でした。すぐにできます。
ただ、現状のアクション機能ではペイロード値の変換ができないので、入れたい形式のままエッジから送るか
Dynamoと同じようにkinesis → lambdaを経由して入れるのが良いかもしれません。
今回はここまでにします。次回は 4「Amazon Elasticsearch Serviceにメッセージを送信する」です。