#経緯
仕事でDynamoDBを使うことになり、その中で「日次でDynamoDBのデータをtxtファイルにエクスポートして所定のS3バケットに置く(日ごとにファイルを分ける)」という要件があり、やり方を調べて実装してみた。
#全体設計
CloudWatchEventを使ったよくある日次処理の構成。
Dynamoの項目が日付をもっているためその日付を条件にその日エクスポートするデータをQueryしてくる。
調べたところ他にDynamoDB Streamをトリガにする方法があるっぽいが、テーブルが更新されるたびにデータを取ってきてファイルにアペンドするより1日の終わりにバッチ一発でやったほうがコスト的にもよくね?と思ったので今回の方法をとった。
このあたり実際のところはどうなのかは分からないので、詳しい方は是非おしえてください( ^)o(^ )
#テーブル
こんな感じ。
DynamoではQueryの条件にできるのはkey項目のみらしい。
ところが条件に指定したかったdateがkey項目ではなかったためdateをキーにしたインデックス(GSI)を作成した。
id(Hash key) | count(Range Key) | date |
---|---|---|
a1 | 1 | 20170101 |
a1 | 2 | 20170101 |
a1 | 3 | 20170101 |
b1 | 1 | 20170102 |
b1 | 2 | 20170102 |
c1 | 1 | 20170102 |
#Lambda関数
こんな感じ。
IAMロールの設定とかは割愛。いい感じにやりました。
'use strict'
const AWS = require('aws-sdk');
const moment = require('moment');
const fs = require('fs');
AWS.config.update({ region: 'ap-northeast-1' });
const docClient = new AWS.DynamoDB.DocumentClient();
const s3 = new AWS.S3();
exports.handler = function (event, context, callback) {
let params = {
TableName: "dynamo_toS3",
IndexName: "date-count-index",
KeyConditionExpression: "#date = :date",
ExpressionAttributeNames: {
"#date": "date"
},
ExpressionAttributeValues: {
":date": moment().format('YYYYMMDD') //バッチ実行日に作成されたレコードのみ取得
}
};
docClient.query(params).promise().then(data => {
let logArray =[];
data.Items.forEach(element => {
logArray.push(JSON.stringify(element));
});
return new Promise(function(resolve, reject) {
//Lambdaでは一時的なファイルの出力先に/tmpが使える
fs.writeFile('/tmp/tmp.txt', logArray.join('\n'), function(err) {
if (err) reject(err);
else resolve(data);
});
});
}).then(() => {
let s3Params = {
Bucket: "my-bucket",
Key: `log_${moment().format('YYYYMMDD')}.txt`,
Body: fs.readFileSync('/tmp/tmp.txt')
}
return s3.putObject(s3Params).promise();
}).then(data => {
console.log(data);
})
.catch(err => {
console.log(err);
})
};
#まとめというか所感
普段RDBに慣れ親しんでいる身からすると、Dynamoはやや癖があるように感じる。
こちらの記事にもある通りSDKを使う場合はAWS.DynamoDBよりもAWS.DynamoDB.DocumentClientを使った方が良い。
次はDynamoDB Streamをトリガにしたものも試してみたい。