LoginSignup
25
21

More than 5 years have passed since last update.

AWS LambdaでAthenaを呼び出す

Last updated at Posted at 2018-02-09

温度センサから値を取得して値を確認する画面をサーバーレスで構築するでlambdaでAthenaを叩いてデータを取得するということをやって実際にやってみたので、それのメモ。

環境

lambda Node.js 6.10

アーキテクチャ

image.png

INPUT(S3)

image.png
上記にtemprerature2018/MM/DD/HH/センサーデータが保存されてます。
センサーデータの例はこんな感じで、JSON形式で保存しています。
humidが湿度、tempが温度、batteryがセンサーのバッテリーです。

センサーデータ
{"timestamp":"2018-02-07T15:19:28.000Z","deviceId":"EBD012965BEB","humid":44,"temp":25.83,"battery":3.18}
{"timestamp":"2018-02-07T15:19:33.000Z","deviceId":"EBD012965BEB","humid":44,"temp":25.81,"battery":3.18}

Athenaの設定

Athenaを開いて左側のCrate tableからManuallyを選択
image.png
Databaseを作成します。入力値を適当に入れてください。
Location of Input Data SetはS3のパスを指定します。
今回temperature2018以下のファイルをIPNUTとするので、そこまでで止めておきます。
(temperature2018/02/08/とかにはしない)
注意点として最後にスラッシュ「/」がないと怒られます。
image.png
S3の入力フォーマットを指定します。今回は上にも書いたようにJSONですね。
image.png
テーブルの列を指定します。
ちょっと検証していないので正しいかは不明ですが、おそらくS3のインプット列すべて指定しないと正しく動いてくれません。batteryを指定するのを忘れていたら動かなかったので。
image.png
Nextをクリックし、Create tableしたらOKです。

正しくSELECT出来ました!
image.png

Athenaは実行結果を自動でS3にバケットを作成しますので、それのバケット名をメモしておきます(Lambdaの設定で使います)。

image.png

Lambdaの設定

var AWS = require('aws-sdk');
var athena = new AWS.Athena({apiVersion: '2017-05-18'});

function startQuery(){
    return new Promise(function(resolve, reject){
        var params = {
            QueryString: 'SELECT * FROM temperature.temp limit 10;', /* required */
            ResultConfiguration: { /* required */
                OutputLocation: 's3://aws-athena-query-results-XXXXXXX-us-east-1/' // ここにAthenaのアウトプットのパスを指定する
            }
        };
        athena.startQueryExecution(params, function(err, data) {
            console.log('data : ' , data);
            resolve(data.QueryExecutionId);
        });

    });
}

function waitQueryEnd(queryExecutionId){
    return new Promise(function(resolve, reject){
        var time = 0;
        var params = {QueryExecutionId: queryExecutionId /* required */};
        (function loop() {
            if (time != 20) {
                athena.getQueryExecution(params, function(err, data) {
                    console.log('Status : ' , data.QueryExecution.Status.State);
                    if (data.QueryExecution.Status.State == 'SUCCEEDED'){
                        resolve(queryExecutionId);
                    } else {
                        sleep(2000);
                        time+=2;
                        loop();
                    }
                });
            } else {
                reject('timeout error!!!');
                return false;
            }
        }());


    });
}

function sleep(waitMsec) {

    var startMsec = new Date();

    // 指定ミリ秒間、空ループ。CPUは常にビジー。
    while (new Date() - startMsec < waitMsec);

}

function getResult(queryExecutionId){
    return new Promise(function(resolve, reject){
        var params = {
          QueryExecutionId: queryExecutionId, /* required */
        };
        athena.getQueryResults(params, function(err, result) {
            var response = [];
            for(let i = 1; i < result.ResultSet.Rows.length; i++) {
              var row = result.ResultSet.Rows[i].Data;
              var data = 
                  { timestamp : row[0].VarCharValue,
                    deviceid : row[1].VarCharValue,
                    humid : row[2].VarCharValue,
                    temp : row[3].VarCharValue,
                    battery : row[4].VarCharValue
                  };
                response.push(data);

            }
            resolve(response);
        });

    });
}

exports.handler = (event, context, callback) => {

    console.log('event : ' , event);
    var promise = Promise.resolve();
    promise
        .then(startQuery)
        .then(waitQueryEnd)
        .then(getResult)
        .then(function(value){
            callback(null,{statusCode: 200,
                      headers: {
                        "Access-Control-Allow-Origin" : "*" // Required for CORS support to work
                      },
                      body : value});
        })
        .catch(function(error){
            callback(null,error);
        });

};

SDKを使って取得しています。またPromiseのメソッドチェーンを使っています。
Promiseの詳細はPromiseのメソッドチェーンの使い方まとめ参照。

startQuery

Athenaを呼び出して、SQLを実行しています。
startQueryExecutionは実行するだけなので、実行結果が返ってくるわけではありませんので、注意。
アウトプットはAthenaの設定でメモしておいたところを設定します。
クエリのIdを次の関数へ渡しておきます。

waitQueryEnd

クエリが終了するのを待ちます。
20秒以上クエリに時間がかかったらエラーとしています。

getResult

クエリの結果を取得しています。
クエリの結果が以下のような形になるため、データを整形して返しています。

[{"Data":[{"VarCharValue":"timestamp"},{"VarCharValue":"deviceid"},{"VarCharValue":"humid"},{"VarCharValue":"temp"},{"VarCharValue":"battery"}]}
,{"Data":[{"VarCharValue":"2018-02-07T03:46:32.000Z"},{"VarCharValue":"EBD012965BEB"},{"VarCharValue":"41.0"},{"VarCharValue":"25.88"},{"VarCharValue":"3.18"}]}
,{"Data":[{"VarCharValue":"2018-02-07T03:46:42.000Z"},{"VarCharValue":"EBD012965BEB"},{"VarCharValue":"41.0"},{"VarCharValue":"25.93"},{"VarCharValue":"3.18"}]}
]

その他

API GatewayからLambdaを呼び出すときにクロスドメインでエラーになってしまうので、
callback関数にレスポンスヘッダーを追記しています。

またデフォルトではLambdaのタイムアウト設定が3秒になっているため、20秒にしておきます。
image.png

実行結果

{"statusCode":200
,"headers":{"Access-Control-Allow-Origin":"*"}
,"body":[{"timestamp":"2018-02-06T18:56:37.000Z","deviceid":"EBD012965BEB","humid":"41.0","temp":"25.79","battery":"3.18"}
        ,{"timestamp":"2018-02-06T18:56:42.000Z","deviceid":"EBD012965BEB","humid":"41.0","temp":"25.79","battery":"3.18"}
        ,{"timestamp":"2018-02-06T18:56:57.000Z","deviceid":"EBD012965BEB","humid":"41.0","temp":"25.78","battery":"3.18"}
        ]}

まとめ

関数の詳細な説明については以下を参考にしてください。
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Athena.html

25
21
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
25
21