温度センサから値を取得して値を確認する画面をサーバーレスで構築するでlambdaでAthenaを叩いてデータを取得するということをやって実際にやってみたので、それのメモ。

環境

lambda Node.js 6.10

アーキテクチャ

image.png

INPUT(S3)

image.png
上記にtemprerature2018/MM/DD/HH/センサーデータが保存されてます。
センサーデータの例はこんな感じで、JSON形式で保存しています。
humidが湿度、tempが温度、batteryがセンサーのバッテリーです。

センサーデータ
{"timestamp":"2018-02-07T15:19:28.000Z","deviceId":"EBD012965BEB","humid":44,"temp":25.83,"battery":3.18}
{"timestamp":"2018-02-07T15:19:33.000Z","deviceId":"EBD012965BEB","humid":44,"temp":25.81,"battery":3.18}

Athenaの設定

Athenaを開いて左側のCrate tableからManuallyを選択
image.png
Databaseを作成します。入力値を適当に入れてください。
Location of Input Data SetはS3のパスを指定します。
今回temperature2018以下のファイルをIPNUTとするので、そこまでで止めておきます。
(temperature2018/02/08/とかにはしない)
注意点として最後にスラッシュ「/」がないと怒られます。
image.png
S3の入力フォーマットを指定します。今回は上にも書いたようにJSONですね。
image.png
テーブルの列を指定します。
ちょっと検証していないので正しいかは不明ですが、おそらくS3のインプット列すべて指定しないと正しく動いてくれません。batteryを指定するのを忘れていたら動かなかったので。
image.png
Nextをクリックし、Create tableしたらOKです。

正しくSELECT出来ました!
image.png

Athenaは実行結果を自動でS3にバケットを作成しますので、それのバケット名をメモしておきます(Lambdaの設定で使います)。

image.png

Lambdaの設定

var AWS = require('aws-sdk');
var athena = new AWS.Athena({apiVersion: '2017-05-18'});

function startQuery(){
    return new Promise(function(resolve, reject){
        var params = {
            QueryString: 'SELECT * FROM temperature.temp limit 10;', /* required */
            ResultConfiguration: { /* required */
                OutputLocation: 's3://aws-athena-query-results-XXXXXXX-us-east-1/' // ここにAthenaのアウトプットのパスを指定する
            }
        };
        athena.startQueryExecution(params, function(err, data) {
            console.log('data : ' , data);
            resolve(data.QueryExecutionId);
        });

    });
}

function waitQueryEnd(queryExecutionId){
    return new Promise(function(resolve, reject){
        var time = 0;
        var params = {QueryExecutionId: queryExecutionId /* required */};
        (function loop() {
            if (time != 20) {
                athena.getQueryExecution(params, function(err, data) {
                    console.log('Status : ' , data.QueryExecution.Status.State);
                    if (data.QueryExecution.Status.State == 'SUCCEEDED'){
                        resolve(queryExecutionId);
                    } else {
                        sleep(2000);
                        time+=2;
                        loop();
                    }
                });
            } else {
                reject('timeout error!!!');
                return false;
            }
        }());


    });
}

function sleep(waitMsec) {

    var startMsec = new Date();

    // 指定ミリ秒間、空ループ。CPUは常にビジー。
    while (new Date() - startMsec < waitMsec);

}

function getResult(queryExecutionId){
    return new Promise(function(resolve, reject){
        var params = {
          QueryExecutionId: queryExecutionId, /* required */
        };
        athena.getQueryResults(params, function(err, result) {
            var response = [];
            for(let i = 1; i < result.ResultSet.Rows.length; i++) {
              var row = result.ResultSet.Rows[i].Data;
              var data = 
                  { timestamp : row[0].VarCharValue,
                    deviceid : row[1].VarCharValue,
                    humid : row[2].VarCharValue,
                    temp : row[3].VarCharValue,
                    battery : row[4].VarCharValue
                  };
                response.push(data);

            }
            resolve(response);
        });

    });
}

exports.handler = (event, context, callback) => {

    console.log('event : ' , event);
    var promise = Promise.resolve();
    promise
        .then(startQuery)
        .then(waitQueryEnd)
        .then(getResult)
        .then(function(value){
            callback(null,{statusCode: 200,
                      headers: {
                        "Access-Control-Allow-Origin" : "*" // Required for CORS support to work
                      },
                      body : value});
        })
        .catch(function(error){
            callback(null,error);
        });

};

SDKを使って取得しています。またPromiseのメソッドチェーンを使っています。
Promiseの詳細はPromiseのメソッドチェーンの使い方まとめ参照。

startQuery

Athenaを呼び出して、SQLを実行しています。
startQueryExecutionは実行するだけなので、実行結果が返ってくるわけではありませんので、注意。
アウトプットはAthenaの設定でメモしておいたところを設定します。
クエリのIdを次の関数へ渡しておきます。

waitQueryEnd

クエリが終了するのを待ちます。
20秒以上クエリに時間がかかったらエラーとしています。

getResult

クエリの結果を取得しています。
クエリの結果が以下のような形になるため、データを整形して返しています。

[{"Data":[{"VarCharValue":"timestamp"},{"VarCharValue":"deviceid"},{"VarCharValue":"humid"},{"VarCharValue":"temp"},{"VarCharValue":"battery"}]}
,{"Data":[{"VarCharValue":"2018-02-07T03:46:32.000Z"},{"VarCharValue":"EBD012965BEB"},{"VarCharValue":"41.0"},{"VarCharValue":"25.88"},{"VarCharValue":"3.18"}]}
,{"Data":[{"VarCharValue":"2018-02-07T03:46:42.000Z"},{"VarCharValue":"EBD012965BEB"},{"VarCharValue":"41.0"},{"VarCharValue":"25.93"},{"VarCharValue":"3.18"}]}
]

その他

API GatewayからLambdaを呼び出すときにクロスドメインでエラーになってしまうので、
callback関数にレスポンスヘッダーを追記しています。

またデフォルトではLambdaのタイムアウト設定が3秒になっているため、20秒にしておきます。
image.png

実行結果

{"statusCode":200
,"headers":{"Access-Control-Allow-Origin":"*"}
,"body":[{"timestamp":"2018-02-06T18:56:37.000Z","deviceid":"EBD012965BEB","humid":"41.0","temp":"25.79","battery":"3.18"}
        ,{"timestamp":"2018-02-06T18:56:42.000Z","deviceid":"EBD012965BEB","humid":"41.0","temp":"25.79","battery":"3.18"}
        ,{"timestamp":"2018-02-06T18:56:57.000Z","deviceid":"EBD012965BEB","humid":"41.0","temp":"25.78","battery":"3.18"}
        ]}

まとめ

関数の詳細な説明については以下を参考にしてください。
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Athena.html

Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.