概要
エヌシーアイ総合システムのtaguchyです。
普段は主にBtoCのWEBシステムや、BtoB向けWEB APIの開発などをしています。
最近はIoT関連でもお仕事させていただいております。
さて、今回は第三話ということで、今日は以下を行います。
(前回はこちら)
- センサーデータをJSONに変換するLambdaを作成
- Kinesis Firehoseを利用してS3, ElasticSearchに格納
- Kibanaにてセンサーデータを参照
利用デバイス
センサーはレンジャーシステムズ株式会社様の「monoコネクト」を利用して、以下のデバイスを使用します。
- BLE/WiFiゲートウェイ:1台
- 人感センサー:3台
AWS構成
Amazon Kinesis Data Firehoseを利用して、AWS IoTからS3とElasticsearchにデータを流します。
センサーデータをJSONに変換するLambdaを作成
AWS Cloud9を使ってLambdaを作成します。
nodeを書くのは初めてですが頑張ります。
csvtojsonやasyncなどのモジュールを利用したいので、画面中央下のコンソールにてnpmコマンドを実行しました。
yuki-taguchi:~/environment $ cd convertSensorDataToJSON/
yuki-taguchi:~/environment/convertSensorDataToJSON $ npm init
・
・
yuki-taguchi:~/environment/convertSensorDataToJSON $ npm install csvtojson
・
・
yuki-taguchi:~/environment/convertSensorDataToJSON $ npm install async
・
・
yuki-taguchi:~/environment/convertSensorDataToJSON $ npm install date-utils
・
・
yuki-taguchi:~/environment/convertSensorDataToJSON $ ls
convertSensorDataToJSON node_modules package.json template.yaml
yuki-taguchi:~/environment/convertSensorDataToJSON $ ls node_modules/
async csvtojson date-utils first-chunk-stream is-utf8 lodash strip-bom
変換用に以下のLambda関数を作成しました。
Kibanaで時系列データとして扱うために"@timestamp"フィールドを作成しています。
'use strict';
console.log('Loading function');
const csv = require('csvtojson');
const asyn = require('async');
require('date-utils');
exports.handler = (event, context, callback) => {
// all record convert.
asyn.map (event.records, function (record, callback) {
// need to base64 decode.
let sensor_data = Buffer.from(record.data, 'base64').toString('utf8');
// "$GPRP" is not need.
var sensor_data_non_gprp = sensor_data.replace(/^(\$GPRP,)/g, "");
// split payload.
let payload = sensor_data_non_gprp.match(/^(.{12},.{12},.*,)(.*)/);
let split = payload[2].match(/^(02010612FF0D0082BC)(.{4})(.{2})(.{8})(.{4})(.{2})(.{6})/);
let fix = split[1];
let battery = split[2];
let status = split[3];
let reserve = split[4];
let user = split[5];
let fix2 = split[6];
let reserve2 = split[7];
// make @timestamp
let now = new Date();
let sensor_data_split_payload = payload[1]
+ fix + "," + battery + ","
+ status + "," + reserve + ","
+ user + "," + fix2+ ","
+ reserve2 + "," + now.toFormat('YYYY-MM-DDTHH24:MI:SS');
csv({
noheader: true,
headers: ['sensor_mac','gw_mac', 'rssi', 'fix', 'battery', 'status', 'reserve', 'user', 'fix2', 'reserve2', '@timestamp']
}).fromString(sensor_data_split_payload)
.on('json',(jsonObj)=>{
// Called for the number of rows.
console.log(jsonObj);
let jsonData = Buffer.from(JSON.stringify(jsonObj), 'utf8').toString('base64');
callback (null, {
recordId : record.recordId,
result : 'Ok',
data : jsonData
}
);
});
}, function (err, results) {
if (err) {
throw err;
}
console.log(results);
callback (null, {records: results});
});
};
ソースの修正が完了したら、Cloud9右側の"AWS Resources"にて"↑"をクリックし、デプロイします。
最後にLambdaのロール設定を行います。
LambdaのAWSコンソールからLambdaの実行ロールを調べます。
※Lambdaのタイムアウト設定を3分に伸ばしました。
(Kinesisの設定画面で注意文言が出る為ですが、実際は必要ないかもしれません。)
KinesisFirehoseから呼び出すため、KinesisFirehoseへのフルアクセスポリシーを設定します。
以上でセンサーデータをJSONに変換するLambdaが作成できました。
Kinesis Firehoseを利用してS3, ElasticSearchに格納する
S3バケットの作成
今回データの可視化はElasticSearchで行うため、S3はバックアップ用途として使用します。
機会があればS3に入ったデータをAthenaなどで見ることもしてみたいです。
ElasticSearchの設定
今回はデバイスデータを以下のルールで保存します。
- 24時間毎に保存
- 保存したデータは1か月保持
1日のデータ量がどの程度になるのかが把握しきれなかっ
ため、まずはデフォルト設定でElasticSearchを立ち上げます。
ElasticSearchのバージョン6.0は、現時点(2018/1/5)ではKinesisFirehoseが対応していないようだったので、5.5に変更しました。
ElasticSearchはVPC内に作成することもできますが、ALBの設定などに手間取ることを避け、今回は社内IPのみに限定したネットワーク公開としました。
Kinesis Firehoseの設定
FirehoseにてLambdaを利用してPayloadデータをJSON変換しつつ、S3およびElasticSearchに転送する設定を行います。
データ変換設定にて、事前に作成したLambdaを指定します。
Firehoseにてデータのバッファを設定します。
今回は60秒置きにバッファしたデータをLambdaにて変換し、S3およびElasticSearchに流し込む設定としました。
※もっと小さい数字にしようかと思いましたが60秒が一番小さい数字でした。
Firehoseのロールはボタンポチポチで勝手に作成されました。とても楽です。
ElasticSearchへの設定もS3とほぼ同様に行います。
作成完了です。
AWSIoTのルール設定
KinesisFirehoseを利用してデータを転送するルール設定を行います。
AWSIoTのACTを選択してルール設定を行いました。
トピックのみフィルターをかけます。
事前に作成したKinesisFirehoseをアクションに追加します。
ロールは既存のロールもしくは新規で作成します。
AWSIoTのルール設定が完了しました。
設定の確認
ElasticSearchのコンソールにて、indeの作成とデータが格納されているか確認できます。
以上でAWSIoTからKinesisFirehoseを利用してS3, ElasticSearchに格納できました。
Kibanaにてセンサーデータを参照
ElasticSearchのコンソールに表示されているKibanaへアクセスします。
「index name or pattern」に設定したindex名を設定します。
以上でKibanaのindex設定が完了しました。
次回更新予定
記事が長くなってしまったので、Kibanaでセンサーデータを参照する部分は予定外ですが次回とします。