前回 【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その3)
の続きです。
過去分
【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その2)
【AWS IoT】AWS IoT Core ルールを使ったデータストアを試してみた(その1)
#前提
- AWS IOT CORE 初級ハンズオンを参考にしています。
- 送信元デバイスは NVIDIA Jetson Nano 開発者キット B01 を利用
- AWS IoT Core のコンパネで「モノ」を作成して、「モノ」用の証明書も作成してデバイスに保存
- デバイスにはAWS IoT Device SDK をインストール
- クライアントプログラムはAWS IOT CORE 初級ハンズオンのダミーデバイス用プログラム(
device_main.py)を利用 - デバイスからIoT Coreに向けてMQTTで30秒おきにデータ送信
- データはこんな感じ
{
"DEVICE_NAME": "jetson-nano-tsumida",
"TIMESTAMP": "2021-03-17T22:03:48",
"TEMPERATURE": 18,
"HUMIDITY": 57
}
##2-1.データのストア先と使ったルール(アクション)
※試してみたいものをチョイスしたリスト
# | ストア先 | 使ったルール(アクション) | 途中経由したサービス |
---|---|---|---|
1 | S3 | Amazon Kinesis Firehose ストリームにメッセージを送信する | Amazon Kinesis Firehose |
2 | DynamoDB | Amazon Kinesis ストリームにメッセージを送信する | Amazon Kinesis Data Streams,Lambda |
3 | timestream | Timestreamテーブルにメッセージを書き込む | なし |
4 | Elasticsearch | Amazon Elasticsearch Serviceにメッセージを送信する | なし |
5 | AWS IoT Analytics | IoT Analyticsにメッセージを送る | IoT Analytics内でchannel,pipeline,storageを経由 |
今回はこの4 「Amazon Elasticsearch Serviceにメッセージを送信する」をやっていきます。
###2-2-4. 「Amazon Elasticsearch Serviceにメッセージを送信する」
Amazon Elasticsearch ServiceはElasticsearchのマネージドサービスです。
Elasticsearchはテキストデータを検索、分析できるようにインデックス化して格納してくれるNoSQLデータストアです。
Apache lucene ベースで、Apache solrとはよく比較されますが、RESTFUL APIで作られているとう点やノンスキーマで入れられる ようなことが特徴でしょうか。
可視化ツールkibanaとの親和性が高くAmazon Elasticsearch Serviceを使えば、kibanaも使えます。
####① Amazon Elasticsearch Service インスタンスの作成
※【補足】Elastiscserch Service インスタンス は停止して課金を止める ようなことができず、課金を止めるなら消すしかないので注意です。
Amazon Elasticsearch Serviceのコンソールにアクセスし、新しいドメインの作成を押します。
- デプロイタイプは「開発及びテスト」
- バージョンは最新の「7.10(latest)」
- Elasticsearch ドメイン名にドメイン名を付けます。(今回は
jetson-nano-tsumida-yyyymmdd
)と入れました。 - カスタムエンドポイントと自動調整はそのままにします。
- データノードは
t3.small.elasticsearch
かt2.small.elasticsearch
を選びます。※選ぶと下に無料枠の説明が表示されます。 - データノードストレージはデフォルトのままとします。EBSの汎用SSDで10GB
- 専用マスターノードは作らず、そのまま次に行きます。
- ネットワークセキュリティは「パブリックアクセス」を選択します。
※後述する設定で接続できるIP制限はします。
- 細かいアクセスコントロールを有効化 のチェックを外します。
※お試し用途であるため。
- Kibana の SAML 認証とAmazon Cognito 認証はそのままにします。(使わない)
- ドメインアクセスポリシーは「カスタムアクセスポリシー」を選びます
- http://checkip.amazonaws.com/ にアクセスし、自身の接続元IPをメモします。
- IPv4アドレス を選び、上記のIPアドレスを入力、許可を選択します。
これでインスタンスの作成手順は完了です。インスタンスができるまで結構時間がかかります。
※10分と出ますが20分くらいはかかります。
ドメインのステータスが「アクティブ」になれば完了です。作成完了画面で、KiabanaのURLはメモしておいてください。
####② IoT Core ルール アクションの設定
これまでと同様にルールとアクションを設定していきます。IoT Core IoT Core 左メニュー「ACT」の下にあるルールをクリックし、画面右上の「作成」をクリックします。
ルールの作成画面が出てくるのでルールに名前を付け、クエリを書きます。
- ルール名は
jetson_nano_tsumida_es
としました。クエリは以下です。
select parse_time("yyyy-MM-dd'T'HH:mm:ss.SSSZ", timestamp()) as timestamp, * from 'data/jetson-nano-tsumida'
※IoT Coreで受け取った時間をペイロードの先頭につけています。parse_timeはタイムスタンプを人間が判読可能な日付/時刻形式にフォーマットします。(このフォーマットはタイムゾーンを表すUTC+0000も付与されます。)
elastichserchは時間データをタイムゾーン情報なしに送ると、UTCとして受け取ります。
例えば今回のデバイスからの送信データ"TIMESTAMP": "2021-06-30T15:00:00"
は
"2021-06-30T15:00:00+0000" として入ります。
これをasia/tokyo TIMEZONEのクライアントで見ると9時間進んで見える(7/1 の0時になる)ので注意です。
今回はあえて小文字のtimestamp
として(送信データそのままの)TIMESTAMP
と比較できるようにしています。
- 次に「アクションの追加」をクリックして、「Amazon Elasticsearch Serviceにメッセージを送信する」を選択して「アクションの設定」をクリックします。
- ドメイン名は① Amazon Elasticsearch Service インスタンスの作成 で入力したElasticsearch ドメイン名 を選択してください。
- IDには
${newuuid()}
を入力します。 - 索引には
timestamp
を入力します。 - タイプにも
timestamp
を入力します。
- ロールの作成 を押しロール名を付けます。(今回は
jetson_nano_tsumida_es
と入れました。)
- アタッチされたポリシー と表示されたことを確認してアクションの追加を押します。
これでルールアクションの設定は完了です。
####③ kibanaでデータを確認する
Elasticsarchに保存されたデータをKibanaで確認していきます。
[① Amazon Elasticsearch Service インスタンスの作成](#① Amazon Elasticsearch Service インスタンスの作成)で作成後に表示されたKibanaのURLにアクセスします。
- index pattern name に
timestamp*
と入力して「Next step」を押します。(*は自動で入ります)
※現在Elasticsearchに入っているデータ(index)一覧が表示されるので、Indexを選んでkibanaに登録する といった流れです。indexは timestamp-yyyymmdd のように日別で分割して作ったりしますので、timestamp-* というインデックス名のパターンで登録できます。
- time field は
timestamp
(タイムゾーン付で送ったデータ)を選択し、「Create index pattern」を押します。
- index patternが登録されると以下のように表示されます。
- もう一度、三 をクリックして「Discover」を押します。
このようにデータが届いていることが確認できました。
グラフの下に表示されているのは、実際に届いたデータ1件1件が表示されています。>を押して開いてみます。
ルールクエリのところで書きましたが、大文字TIMESTAMPはタイムゾーンなしで送信したもの、小文字timestampはtimezone付きで送信したものです。
TIMESTAMP:Jul 1, 2021 @ 02:19:47.000
timestamp:Jun 30, 2021 @ 17:19:47.320
実際に送信した時間は17:19:47です。大文字TIMESTAMPは9時間進んでいる(誤った時間になっている)ことがわかります。エッジからの送信時にはタイムゾーンを付けて送るのが良いと思います。(UTCに変換して、2021-06-30T08:19:47Z として送るなど)
####④ kibanaでデータを可視化する
次はKibanaでグラフを作ってみます。
-
三 をクリックして「Visualize」を押します。
-
「Create new visualization」を押します。
- 「Line」を選びます。
- 「timestamp*」を選びます。
- グラフ画面が表示されすので、右の Y-axisを設定していきます。
Aggregationに Avarage
Fieldに TEMPERATURE
を入れます。送信された温度データの時間単位の平均値をだすという意味です。
- 「+Add」押してY-axisを選び以下を追加します。
Aggregationに Avarage
Fieldに HUMIDITY
を入れます。送信された湿度データの時間単位の平均値をだすという意味です。
- Bucket下の「+Add」押してX-axisを選び以下を追加します。
Aggregationに Date Histogram
Fieldに timestamp
minimum intervalに Minute
を入れます。横軸を時間にして10分単位で集計する という意味です。
- 「Update」を押すとグラフが表示されます。
Kibanaによる可視化はここまでです。
#####小ネタ
elasticsearch に入ったIndexを確認したり、削除したりするにはElasticsearchのapiを呼びます。
PCのターミナルからcurlを実行します。※curlは必要に応じて入れてください。
- Index一覧を取得する
curl -XGET 'https://[elasticsearchのエンドポイント]/_cat/indices?v'
※elasticsearchのエンドポイントはelasticsearchのコンソールで確認できます。
- 結果
$ curl -XGET 'https://[elasticsearchのエンドポイント]/_cat/indices?v'
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open .kibana_1 CAFIybgnSf6EC9QG0hW8mQ 1 0 26 5 27.7kb 27.7kb
yellow open timestamp 0wFtu4F6RNGYiRcPS61OVg 5 1 118 0 194.6kb 194.6kb
sampleで入れたデータなどを削除する場合は削除リクエストを投げることで消せます。
- Indexを削除する
curl -XDELETE 'https://[elasticsearchのエンドポイント]/timestamp?pretty'
- 結果
$ curl -XDELETE 'https://[elasticsearchのエンドポイント]/timestamp?pretty'
{
"acknowledged" : true
}
####⑤ 今回のまとめ(IoT Core → ルール → Elasticsearch )
IoT Core に届いたデータを直接Elasticsearchに入れてみました。これも設定自体はとても簡単でした。すぐにできます。
Elasticsearchは文字列データの検索が強いので、そういった要件があれば検討してみてもよいと思います。kibanaで可視化も簡単にできるので便利です。
ただ他のデータストアよりコストはかかるので注意です。(データ量だけでなく、EC2インスタンスを常時起動している分の料金がかかる)
今回はここまでにします。次回はラスト 5「IoT Analyticsにメッセージを送る」を投稿します。