ちなみに、この構成はdatapipelineの値段が高くつくのでまったくおすすめしません。
普通にlambdaだけで実装したほうがよいです。
"s3にあがったら"みたいなイベント駆動のジョブは、datapipeline使うべきではないみたいです。
が、一応動くもの作ったのでメモ。
仕様をざっくり
30分に一回s3にアップされるcsvファイルをRDSにインサートする
※csvファイル名は毎回異なる
だいたいの流れ
- s3にcsvファイルがputされてlambda発火
- lambdaでdatapipelineを定義・作成・実行
- datapipelineがs3データをrdsにロード
#手順
0. 新規のdatapipelineのテンプレートとなるcomponentを作っておく
- s3データをrdsにロードするdatapipelineを作成
- 1日1回実行設定
- activateしないでおく
- lambdaこの定義をテンプレートとして利用して、新規datapipelineを作成する
1. s3の作成で発火するlambdaを定義する
2. lambdaでdatapipelineを定義・作成・実行するスクリプトを設定
node.jsを利用。datapipelineのaws-sdkを使う。
console.log('Loading function');
var aws = require('aws-sdk');
var s3 = new aws.S3({ apiVersion: '2006-03-01' });
var dp = new aws.DataPipeline();
exports.handler = function(event, context) {
// putされたs3ファイルパス
var path = event['Records'][0]['s3']['object']['key'];
var params = {
name: 'test1',
uniqueId: '1'
}
// DP設定を新規作成
dp.createPipeline(params, function(err, createResult) {
if (err) {
console.log(err);
throw true;
}
// 新規作成されたpipelineId
var pipelineId = createResult['pipelineId'];
var params = {
pipelineId: 'template-pipeline-id' // テンプレートのid
}
// テンプレートから定義を取得
dp.getPipelineDefinition(params, function(err, def) {
if (err) {
console.log(err);
throw true;
}
// テンプレート定義を変更
def['pipelineId'] = pipelineId;
var p_values = def['parameterValues'];
// s3のファイルパスをputされたs3ファイルに変更
for (var i = 0; i < p_values.length; i++) {
if (p_values[i]['id'] == 'myInputS3Loc') {
// s3の場所
p_values[i]['stringValue'] = 's3://backet/' + path;
}
}
// 新規作成したDatapipeline項目に、変更した定義を登録する
dp.putPipelineDefinition(def, function(err) {
if (err) {
console.log(err);
throw true;
}
// 実行
dp.activatePipeline({pipelineId: pipelineId}, function(err){
if (err) {
console.log(err);
throw true;
}
context.succeed(true);
})
});
});
});
};
これで、対象s3バケットにcsvファイルが上がると、pipelineが作成されてrdsにロードされた。
30分に一回だと1ヶ月に 720USD = (48times * 0.5USD * 30days) くらいかかりそうなので、全然意味わからない構成です。
activateされたdatapipelineの定義を変更できると安く済むのになぁと思いました。
ちなみに、この処理結局バッチサーバーで普通にcron実装することにしました。lambdaだとエラー時にアラート送れないので。