AWS IoTのデータをKibanaに表示する

  • 62
    いいね
  • 4
    コメント

[注]こちらの記事はAWS IoTが出始めた頃(2015年末)にトライしたものですので、2017年3月時点ではだいぶ見た目も機能も変わってしまっているようです。
ナウでヤングな方は、@moritalous さんによる「AWS IoTのデータをKibanaに表示する」をやってみた。2017/3版 をチェック!


AWS IoTでデータのPubSubや保存が極端に簡単になりましたが、データのビジュアライゼーション機能はありません。ということで、今回はElasticsearchにデータを投入してKibanaに表示させてみましょう。

全体像は以下。

名称未設定.jpg

植物のプランターにセンサを取り付けるのを想定して、温度や光度情報をKibanaに表示します。

Elasticsearchインスタンスを立ち上げる

まずは、立ち上がりが遅いElasticsearchを先に立ち上げておきましょう。
Domain nameを plant-sensor、Instance typeはt2.micro.elasticsearch、Instance countは 1、Strage typeは EBS に設定します。

スクリーンショット_2015-12-31_10_41_07.jpg

Kibana のURLをクリックするとKibana4の画面が表示されます。EndpointのURLはLambdaで使います。

Kinesis Streamを作成

AWS IoTの情報を流す先のKinesis zstreamを先に作っておきます。

Stream Nameを plant-sensors、Number of Shardsを 1 で作成します。

AWS IoTでThingやRuleを作成

まずはplant-sensor という名前のThingを作成。そして plant-sensor のDetail画面にて Connect a device ボタンを押して、CertificateとPolicyを作成します。この時、SDK選択画面が出てくるので NodeJS を選択しましょう。証明書は後で使うのでダウンロードしておきましょう。

最後にRuleを作成します。今回は plant/sensors というトピック名でデータを飛ばそうと思うので、Topic filterに plant/sensors を設定します。アクションは Send message to a real-time data stream (Kinesis) を選択し、先ほど作った plant-sensors というKinesis Streamを選択します。今回はShards数は1つなのでPartition Keyは適当なものを設定しても大丈夫です。そして Add action ボタンを押して Create を押せばRule作成完了です。

スクリーンショット 2015-12-31 11.36.57.png

これでIoT側の準備は万端。Thing, Certificate, Policy, Ruleの4つの項目が作られているはずです。

スクリーンショット_2015-12-31_11_21_46.jpg

仮想的なIoTデバイスを作成

手元にセンサがないので、とりあえず温度や湿度っぽいデータを生成するプログラムを作ってみましょう。

先ほどダウンロードした証明書は certs ディレクトリに入れておきます。 root-CA.crtこちらからダウンロードします。

plant-sensor.js
'use strict'

var awsIot = require( 'aws-iot-device-sdk');

var device = awsIot.device({
  keyPath: './certs/1c03d79728-private.pem.key',
  certPath: './certs/1c03d79728-certificate.pem.crt',
  caPath: './certs/root-CA.crt',
  clientId: 'planta-sensor',
  region: 'us-east-1'
});

var getRandomData = function(value){
  return Math.floor(value + Math.random()*value/10 - value/10/2);
}

var sendSensorData = function(){
  var data = JSON.stringify({
    timestamp: new Date(),
    humidity: getRandomData(45),
    temperature: getRandomData(20),
    lux: getRandomData(32000),
    moisture: getRandomData(301)
  }); 
  console.log(data);
  device.publish('plant/sensors', data);
}

device
  .on('connect', function(){
    console.log('connect');

    setInterval(function(){
      sendSensorData();
    }, 1000);
  });

早速実行してみましょう。

$ npm init
$ npm install --save aws-iot-device-sdk
$ node plant-sensor.js
connect
{"timestamp":"2015-12-31T03:08:21.059Z","humidity":42,"temperature":20,"lux":33037,"moisture":295}
{"timestamp":"2015-12-31T03:08:22.070Z","humidity":46,"temperature":19,"lux":33443,"moisture":290}
{"timestamp":"2015-12-31T03:08:23.071Z","humidity":43,"temperature":20,"lux":30566,"moisture":297}
{"timestamp":"2015-12-31T03:08:24.072Z","humidity":45,"temperature":19,"lux":33140,"moisture":308}
・・・・

CloudWatchのIoT MetricとKinesisのMonitoringを見てみると、ちゃんとデータが来ているのを確認できます。

LambdaでKinesis StreamのイベントをElasticsearchに流す

awslabs/amazon-elasticsearch-lambda-samplesのコードを参考にしながら、Kinesis StreamのイベントのレコードをElasticsearchに投入するLambdaコードを作成します。

index.handler.js
var path = require('path');
var AWS = require('aws-sdk');

var esDomain = {
    region: 'us-east-1',
    endpoint: 'search-plant-sensor-****************.us-east-1.es.amazonaws.com',
    index: 'plant-sensor',
    doctype: 'sensor'
};
var endpoint = new AWS.Endpoint(esDomain.endpoint);
var creds = new AWS.EnvironmentCredentials('AWS');

exports.handler = function(event, context) {
    //console.log('Received event:', JSON.stringify(event, null, 2));
    event.Records.forEach(function(record) {
        // Kinesis data is base64 encoded so decode here
        var payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
        postToES(payload, context);
        console.log('Decoded payload:', payload);
    });
    context.succeed('Successfully processed ' + event.Records.length + ' records.');
};

function postToES(doc, context) {
    var req = new AWS.HttpRequest(endpoint);

    req.method = 'POST';
    req.path = path.join('/', esDomain.index, esDomain.doctype);
    req.region = esDomain.region;
    req.headers['presigned-expires'] = false;
    req.headers['Host'] = endpoint.host;
    req.body = doc;

    var signer = new AWS.Signers.V4(req , 'es');  // es: service code
    signer.addAuthorization(creds, new Date());

    var send = new AWS.NodeHttpClient();
    send.handleRequest(req, null, function(httpResp) {
        var respBody = '';
        httpResp.on('data', function (chunk) {
            respBody += chunk;
        });
        httpResp.on('end', function (chunk) {
            console.log('Response: ' + respBody);
            context.succeed('Lambda added document ' + doc);
        });
    }, function(err) {
        console.log('Error: ' + err);
        context.fail('Lambda failed with error ' + err);
    });
}

注意点としては、LambdaのRoleはKinesisとElasticsearchにアクセスできるようにしましょう。

CloudWatchで実行ログを確認してみて、ちゃんと動作してそうだったらOKです。

Kibanaでダッシュボードを作成する

Discoverタブを見てみると、ちゃんとデータが来ているのを確認できます。

スクリーンショット 2015-12-31 12.20.13.png

では、チャートを作ってみましょう。Visualizeタブで、以下の様な感じでグラフを作成します。

スクリーンショット 2015-12-30 12.06.05.png

最後にダッシュボードを作成します。Dashboardタブを選択して、Add visualization で先ほど作成したチャートをポンポンと選択していくだけです!

スクリーンショット 2015-12-30 12.06.22.png

構成要素は多いですが、意外と簡単に作れるものですね!

この投稿は AWS Advent Calendar 201525日目の記事です。