温度センサから値を取得して値を確認する画面をサーバーレスで構築するでlambdaでAthenaを叩いてデータを取得するということをやって実際にやってみたので、それのメモ。
##環境
lambda Node.js 6.10
##INPUT(S3)
上記にtemprerature2018/MM/DD/HH/センサーデータが保存されてます。
センサーデータの例はこんな感じで、JSON形式で保存しています。
humidが湿度、tempが温度、batteryがセンサーのバッテリーです。
{"timestamp":"2018-02-07T15:19:28.000Z","deviceId":"EBD012965BEB","humid":44,"temp":25.83,"battery":3.18}
{"timestamp":"2018-02-07T15:19:33.000Z","deviceId":"EBD012965BEB","humid":44,"temp":25.81,"battery":3.18}
Athenaの設定
Athenaを開いて左側のCrate tableからManuallyを選択
Databaseを作成します。入力値を適当に入れてください。
Location of Input Data SetはS3のパスを指定します。
今回temperature2018以下のファイルをIPNUTとするので、そこまでで止めておきます。
(temperature2018/02/08/とかにはしない)
注意点として最後にスラッシュ「/」がないと怒られます。
S3の入力フォーマットを指定します。今回は上にも書いたようにJSONですね。
テーブルの列を指定します。
ちょっと検証していないので正しいかは不明ですが、おそらくS3のインプット列すべて指定しないと正しく動いてくれません。batteryを指定するのを忘れていたら動かなかったので。
Nextをクリックし、Create tableしたらOKです。
Athenaは実行結果を自動でS3にバケットを作成しますので、それのバケット名をメモしておきます(Lambdaの設定で使います)。
Lambdaの設定
var AWS = require('aws-sdk');
var athena = new AWS.Athena({apiVersion: '2017-05-18'});
function startQuery(){
return new Promise(function(resolve, reject){
var params = {
QueryString: 'SELECT * FROM temperature.temp limit 10;', /* required */
ResultConfiguration: { /* required */
OutputLocation: 's3://aws-athena-query-results-XXXXXXX-us-east-1/' // ここにAthenaのアウトプットのパスを指定する
}
};
athena.startQueryExecution(params, function(err, data) {
console.log('data : ' , data);
resolve(data.QueryExecutionId);
});
});
}
function waitQueryEnd(queryExecutionId){
return new Promise(function(resolve, reject){
var time = 0;
var params = {QueryExecutionId: queryExecutionId /* required */};
(function loop() {
if (time != 20) {
athena.getQueryExecution(params, function(err, data) {
console.log('Status : ' , data.QueryExecution.Status.State);
if (data.QueryExecution.Status.State == 'SUCCEEDED'){
resolve(queryExecutionId);
} else {
sleep(2000);
time+=2;
loop();
}
});
} else {
reject('timeout error!!!');
return false;
}
}());
});
}
function sleep(waitMsec) {
var startMsec = new Date();
// 指定ミリ秒間、空ループ。CPUは常にビジー。
while (new Date() - startMsec < waitMsec);
}
function getResult(queryExecutionId){
return new Promise(function(resolve, reject){
var params = {
QueryExecutionId: queryExecutionId, /* required */
};
athena.getQueryResults(params, function(err, result) {
var response = [];
for(let i = 1; i < result.ResultSet.Rows.length; i++) {
var row = result.ResultSet.Rows[i].Data;
var data =
{ timestamp : row[0].VarCharValue,
deviceid : row[1].VarCharValue,
humid : row[2].VarCharValue,
temp : row[3].VarCharValue,
battery : row[4].VarCharValue
};
response.push(data);
}
resolve(response);
});
});
}
exports.handler = (event, context, callback) => {
console.log('event : ' , event);
var promise = Promise.resolve();
promise
.then(startQuery)
.then(waitQueryEnd)
.then(getResult)
.then(function(value){
callback(null,{statusCode: 200,
headers: {
"Access-Control-Allow-Origin" : "*" // Required for CORS support to work
},
body : value});
})
.catch(function(error){
callback(null,error);
});
};
SDKを使って取得しています。またPromiseのメソッドチェーンを使っています。
Promiseの詳細はPromiseのメソッドチェーンの使い方まとめ参照。
###startQuery
Athenaを呼び出して、SQLを実行しています。
startQueryExecutionは実行するだけなので、実行結果が返ってくるわけではありませんので、注意。
アウトプットはAthenaの設定でメモしておいたところを設定します。
クエリのIdを次の関数へ渡しておきます。
###waitQueryEnd
クエリが終了するのを待ちます。
20秒以上クエリに時間がかかったらエラーとしています。
###getResult
クエリの結果を取得しています。
クエリの結果が以下のような形になるため、データを整形して返しています。
[{"Data":[{"VarCharValue":"timestamp"},{"VarCharValue":"deviceid"},{"VarCharValue":"humid"},{"VarCharValue":"temp"},{"VarCharValue":"battery"}]}
,{"Data":[{"VarCharValue":"2018-02-07T03:46:32.000Z"},{"VarCharValue":"EBD012965BEB"},{"VarCharValue":"41.0"},{"VarCharValue":"25.88"},{"VarCharValue":"3.18"}]}
,{"Data":[{"VarCharValue":"2018-02-07T03:46:42.000Z"},{"VarCharValue":"EBD012965BEB"},{"VarCharValue":"41.0"},{"VarCharValue":"25.93"},{"VarCharValue":"3.18"}]}
]
###その他
API GatewayからLambdaを呼び出すときにクロスドメインでエラーになってしまうので、
callback関数にレスポンスヘッダーを追記しています。
またデフォルトではLambdaのタイムアウト設定が3秒になっているため、20秒にしておきます。
##実行結果
{"statusCode":200
,"headers":{"Access-Control-Allow-Origin":"*"}
,"body":[{"timestamp":"2018-02-06T18:56:37.000Z","deviceid":"EBD012965BEB","humid":"41.0","temp":"25.79","battery":"3.18"}
,{"timestamp":"2018-02-06T18:56:42.000Z","deviceid":"EBD012965BEB","humid":"41.0","temp":"25.79","battery":"3.18"}
,{"timestamp":"2018-02-06T18:56:57.000Z","deviceid":"EBD012965BEB","humid":"41.0","temp":"25.78","battery":"3.18"}
]}
##まとめ
関数の詳細な説明については以下を参考にしてください。
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Athena.html