4
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AWSとBLE人感センサーで会議室の利用状況を可視化してみる[第3話]

Last updated at Posted at 2018-01-08

概要

エヌシーアイ総合システムのtaguchyです。
普段は主にBtoCのWEBシステムや、BtoB向けWEB APIの開発などをしています。
最近はIoT関連でもお仕事させていただいております。

さて、今回は第三話ということで、今日は以下を行います。
(前回はこちら)

  1. センサーデータをJSONに変換するLambdaを作成
  2. Kinesis Firehoseを利用してS3, ElasticSearchに格納
  3. Kibanaにてセンサーデータを参照

利用デバイス

センサーはレンジャーシステムズ株式会社様の「monoコネクト」を利用して、以下のデバイスを使用します。

  • BLE/WiFiゲートウェイ:1台
  • 人感センサー:3台

AWS構成

Amazon Kinesis Data Firehoseを利用して、AWS IoTからS3とElasticsearchにデータを流します。

AWS構成.png

センサーデータをJSONに変換するLambdaを作成

AWS Cloud9を使ってLambdaを作成します。
nodeを書くのは初めてですが頑張ります。

Cloud9_1.png

csvtojsonやasyncなどのモジュールを利用したいので、画面中央下のコンソールにてnpmコマンドを実行しました。

yuki-taguchi:~/environment $ cd convertSensorDataToJSON/
yuki-taguchi:~/environment/convertSensorDataToJSON $ npm init
・
・
yuki-taguchi:~/environment/convertSensorDataToJSON $ npm install csvtojson
・
・
yuki-taguchi:~/environment/convertSensorDataToJSON $ npm install async
・
・
yuki-taguchi:~/environment/convertSensorDataToJSON $ npm install date-utils
・
・
yuki-taguchi:~/environment/convertSensorDataToJSON $ ls
convertSensorDataToJSON  node_modules  package.json  template.yaml
yuki-taguchi:~/environment/convertSensorDataToJSON $ ls node_modules/
async  csvtojson  date-utils  first-chunk-stream  is-utf8  lodash  strip-bom

変換用に以下のLambda関数を作成しました。
Kibanaで時系列データとして扱うために"@timestamp"フィールドを作成しています。

'use strict';
console.log('Loading function');
const csv = require('csvtojson');
const asyn = require('async');
require('date-utils');

exports.handler = (event, context, callback) => {
        // all record convert.
        asyn.map (event.records, function (record, callback) {
            // need to base64 decode.
            let sensor_data = Buffer.from(record.data, 'base64').toString('utf8');
            // "$GPRP" is not need.
            var sensor_data_non_gprp = sensor_data.replace(/^(\$GPRP,)/g, "");
            // split payload.
            let payload = sensor_data_non_gprp.match(/^(.{12},.{12},.*,)(.*)/);
            let split = payload[2].match(/^(02010612FF0D0082BC)(.{4})(.{2})(.{8})(.{4})(.{2})(.{6})/); 
            let fix = split[1];
            let battery = split[2];
            let status = split[3];
            let reserve = split[4];
            let user = split[5];
            let fix2 = split[6];
            let reserve2 = split[7];
            
            // make @timestamp
            let now = new Date();
            
            let sensor_data_split_payload = payload[1]
                                + fix + "," + battery + ","
                                + status + "," + reserve + ","
                                + user + "," + fix2+ ","
                                + reserve2 + "," + now.toFormat('YYYY-MM-DDTHH24:MI:SS');
            csv({
                noheader: true,
                headers: ['sensor_mac','gw_mac', 'rssi', 'fix', 'battery', 'status', 'reserve', 'user', 'fix2', 'reserve2', '@timestamp']
            }).fromString(sensor_data_split_payload)
                .on('json',(jsonObj)=>{
                // Called for the number of rows.
                console.log(jsonObj);
                let jsonData = Buffer.from(JSON.stringify(jsonObj), 'utf8').toString('base64');
                callback (null, {
                                recordId : record.recordId,
                                result : 'Ok',
                                data : jsonData
                                }
                );
            });
        }, function (err, results) {
            if (err) {
                throw err;
            }
            console.log(results);
            callback (null, {records: results});
        });
};

ソースの修正が完了したら、Cloud9右側の"AWS Resources"にて"↑"をクリックし、デプロイします。

最後にLambdaのロール設定を行います。
LambdaのAWSコンソールからLambdaの実行ロールを調べます。
※Lambdaのタイムアウト設定を3分に伸ばしました。
 (Kinesisの設定画面で注意文言が出る為ですが、実際は必要ないかもしれません。)

KinesisFirehoseから呼び出すため、KinesisFirehoseへのフルアクセスポリシーを設定します。

以上でセンサーデータをJSONに変換するLambdaが作成できました。

Kinesis Firehoseを利用してS3, ElasticSearchに格納する

S3バケットの作成

今回データの可視化はElasticSearchで行うため、S3はバックアップ用途として使用します。
機会があればS3に入ったデータをAthenaなどで見ることもしてみたいです。

バケットを作成します。
AWS_S3.png

ElasticSearchの設定

今回はデバイスデータを以下のルールで保存します。

  1. 24時間毎に保存
  2. 保存したデータは1か月保持

1日のデータ量がどの程度になるのかが把握しきれなかっ
ため、まずはデフォルト設定でElasticSearchを立ち上げます。
ElasticSearchのバージョン6.0は、現時点(2018/1/5)ではKinesisFirehoseが対応していないようだったので、5.5に変更しました。

スナップショットの時間はAM1:00としています。

ElasticSearchはVPC内に作成することもできますが、ALBの設定などに手間取ることを避け、今回は社内IPのみに限定したネットワーク公開としました。

Kinesis Firehoseの設定

FirehoseにてLambdaを利用してPayloadデータをJSON変換しつつ、S3およびElasticSearchに転送する設定を行います。

データ変換設定にて、事前に作成したLambdaを指定します。

Firehoseにてデータのバッファを設定します。
今回は60秒置きにバッファしたデータをLambdaにて変換し、S3およびElasticSearchに流し込む設定としました。
※もっと小さい数字にしようかと思いましたが60秒が一番小さい数字でした。

Firehoseのロールはボタンポチポチで勝手に作成されました。とても楽です。
ElasticSearchへの設定もS3とほぼ同様に行います。

作成完了です。

AWSIoTのルール設定

KinesisFirehoseを利用してデータを転送するルール設定を行います。
AWSIoTのACTを選択してルール設定を行いました。

トピックのみフィルターをかけます。

事前に作成したKinesisFirehoseをアクションに追加します。
ロールは既存のロールもしくは新規で作成します。

AWSIoTのルール設定が完了しました。

設定の確認

ElasticSearchのコンソールにて、indeの作成とデータが格納されているか確認できます。

以上でAWSIoTからKinesisFirehoseを利用してS3, ElasticSearchに格納できました。

Kibanaにてセンサーデータを参照

ElasticSearchのコンソールに表示されているKibanaへアクセスします。

「index name or pattern」に設定したindex名を設定します。

以上でKibanaのindex設定が完了しました。

次回更新予定

記事が長くなってしまったので、Kibanaでセンサーデータを参照する部分は予定外ですが次回とします。

4
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?