S3
kibana
awsIoT
AmazonElasticsearchService
KinesisFirehose

AWSとBLE人感センサーで会議室の利用状況を可視化してみる[第3話]

概要

エヌシーアイ総合システムのtaguchyです。
普段は主にBtoCのWEBシステムや、BtoB向けWEB APIの開発などをしています。
最近はIoT関連でもお仕事させていただいております。

さて、今回は第三話ということで、今日は以下を行います。
(前回はこちら)

  1. センサーデータをJSONに変換するLambdaを作成
  2. Kinesis Firehoseを利用してS3, ElasticSearchに格納
  3. Kibanaにてセンサーデータを参照

利用デバイス

センサーはレンジャーシステムズ株式会社様の「monoコネクト」を利用して、以下のデバイスを使用します。

  • BLE/WiFiゲートウェイ:1台
  • 人感センサー:3台

AWS構成

Amazon Kinesis Data Firehoseを利用して、AWS IoTからS3とElasticsearchにデータを流します。

AWS構成.png

センサーデータをJSONに変換するLambdaを作成

AWS Cloud9を使ってLambdaを作成します。
nodeを書くのは初めてですが頑張ります。

Cloud9_1.png

csvtojsonやasyncなどのモジュールを利用したいので、画面中央下のコンソールにてnpmコマンドを実行しました。

yuki-taguchi:~/environment $ cd convertSensorDataToJSON/
yuki-taguchi:~/environment/convertSensorDataToJSON $ npm init
・
・
yuki-taguchi:~/environment/convertSensorDataToJSON $ npm install csvtojson
・
・
yuki-taguchi:~/environment/convertSensorDataToJSON $ npm install async
・
・
yuki-taguchi:~/environment/convertSensorDataToJSON $ npm install date-utils
・
・
yuki-taguchi:~/environment/convertSensorDataToJSON $ ls
convertSensorDataToJSON  node_modules  package.json  template.yaml
yuki-taguchi:~/environment/convertSensorDataToJSON $ ls node_modules/
async  csvtojson  date-utils  first-chunk-stream  is-utf8  lodash  strip-bom

変換用に以下のLambda関数を作成しました。
Kibanaで時系列データとして扱うために"@timestamp"フィールドを作成しています。

'use strict';
console.log('Loading function');
const csv = require('csvtojson');
const asyn = require('async');
require('date-utils');

exports.handler = (event, context, callback) => {
        // all record convert.
        asyn.map (event.records, function (record, callback) {
            // need to base64 decode.
            let sensor_data = Buffer.from(record.data, 'base64').toString('utf8');
            // "$GPRP" is not need.
            var sensor_data_non_gprp = sensor_data.replace(/^(\$GPRP,)/g, "");
            // split payload.
            let payload = sensor_data_non_gprp.match(/^(.{12},.{12},.*,)(.*)/);
            let split = payload[2].match(/^(02010612FF0D0082BC)(.{4})(.{2})(.{8})(.{4})(.{2})(.{6})/); 
            let fix = split[1];
            let battery = split[2];
            let status = split[3];
            let reserve = split[4];
            let user = split[5];
            let fix2 = split[6];
            let reserve2 = split[7];

            // make @timestamp
            let now = new Date();

            let sensor_data_split_payload = payload[1]
                                + fix + "," + battery + ","
                                + status + "," + reserve + ","
                                + user + "," + fix2+ ","
                                + reserve2 + "," + now.toFormat('YYYY-MM-DDTHH24:MI:SS');
            csv({
                noheader: true,
                headers: ['sensor_mac','gw_mac', 'rssi', 'fix', 'battery', 'status', 'reserve', 'user', 'fix2', 'reserve2', '@timestamp']
            }).fromString(sensor_data_split_payload)
                .on('json',(jsonObj)=>{
                // Called for the number of rows.
                console.log(jsonObj);
                let jsonData = Buffer.from(JSON.stringify(jsonObj), 'utf8').toString('base64');
                callback (null, {
                                recordId : record.recordId,
                                result : 'Ok',
                                data : jsonData
                                }
                );
            });
        }, function (err, results) {
            if (err) {
                throw err;
            }
            console.log(results);
            callback (null, {records: results});
        });
};

ソースの修正が完了したら、Cloud9右側の"AWS Resources"にて"↑"をクリックし、デプロイします。

最後にLambdaのロール設定を行います。
LambdaのAWSコンソールからLambdaの実行ロールを調べます。
※Lambdaのタイムアウト設定を3分に伸ばしました。
 (Kinesisの設定画面で注意文言が出る為ですが、実際は必要ないかもしれません。)

KinesisFirehoseから呼び出すため、KinesisFirehoseへのフルアクセスポリシーを設定します。

以上でセンサーデータをJSONに変換するLambdaが作成できました。

Kinesis Firehoseを利用してS3, ElasticSearchに格納する

S3バケットの作成

今回データの可視化はElasticSearchで行うため、S3はバックアップ用途として使用します。
機会があればS3に入ったデータをAthenaなどで見ることもしてみたいです。

バケットを作成します。
AWS_S3.png

ElasticSearchの設定

今回はデバイスデータを以下のルールで保存します。
1. 24時間毎に保存
2. 保存したデータは1か月保持

1日のデータ量がどの程度になるのかが把握しきれなかっ
ため、まずはデフォルト設定でElasticSearchを立ち上げます。
ElasticSearchのバージョン6.0は、現時点(2018/1/5)ではKinesisFirehoseが対応していないようだったので、5.5に変更しました。

スナップショットの時間はAM1:00としています。

ElasticSearchはVPC内に作成することもできますが、ALBの設定などに手間取ることを避け、今回は社内IPのみに限定したネットワーク公開としました。

Kinesis Firehoseの設定

FirehoseにてLambdaを利用してPayloadデータをJSON変換しつつ、S3およびElasticSearchに転送する設定を行います。

データ変換設定にて、事前に作成したLambdaを指定します。


Firehoseにてデータのバッファを設定します。
今回は60秒置きにバッファしたデータをLambdaにて変換し、S3およびElasticSearchに流し込む設定としました。
※もっと小さい数字にしようかと思いましたが60秒が一番小さい数字でした。

Firehoseのロールはボタンポチポチで勝手に作成されました。とても楽です。
ElasticSearchへの設定もS3とほぼ同様に行います。

作成完了です。

AWSIoTのルール設定

KinesisFirehoseを利用してデータを転送するルール設定を行います。
AWSIoTのACTを選択してルール設定を行いました。

トピックのみフィルターをかけます。

事前に作成したKinesisFirehoseをアクションに追加します。
ロールは既存のロールもしくは新規で作成します。

AWSIoTのルール設定が完了しました。

設定の確認

ElasticSearchのコンソールにて、indeの作成とデータが格納されているか確認できます。

以上でAWSIoTからKinesisFirehoseを利用してS3, ElasticSearchに格納できました。

Kibanaにてセンサーデータを参照

ElasticSearchのコンソールに表示されているKibanaへアクセスします。

「index name or pattern」に設定したindex名を設定します。

以上でKibanaのindex設定が完了しました。

次回更新予定

記事が長くなってしまったので、Kibanaでセンサーデータを参照する部分は予定外ですが次回とします。