LoginSignup
5
6

More than 5 years have passed since last update.

s3に定期的にあがるcsvファイルをrdsにロード。lambdaとdatapipelineで。

Last updated at Posted at 2016-02-23

ちなみに、この構成はdatapipelineの値段が高くつくのでまったくおすすめしません。
普通にlambdaだけで実装したほうがよいです。
"s3にあがったら"みたいなイベント駆動のジョブは、datapipeline使うべきではないみたいです。
が、一応動くもの作ったのでメモ。

仕様をざっくり

30分に一回s3にアップされるcsvファイルをRDSにインサートする
※csvファイル名は毎回異なる

だいたいの流れ

  1. s3にcsvファイルがputされてlambda発火
  2. lambdaでdatapipelineを定義・作成・実行
  3. datapipelineがs3データをrdsにロード

手順

0. 新規のdatapipelineのテンプレートとなるcomponentを作っておく

  • s3データをrdsにロードするdatapipelineを作成
  • 1日1回実行設定
  • activateしないでおく
  • lambdaこの定義をテンプレートとして利用して、新規datapipelineを作成する

1. s3の作成で発火するlambdaを定義する

2. lambdaでdatapipelineを定義・作成・実行するスクリプトを設定

node.jsを利用。datapipelineのaws-sdkを使う。

console.log('Loading function');

var aws = require('aws-sdk');
var s3 = new aws.S3({ apiVersion: '2006-03-01' });

var dp = new aws.DataPipeline();

exports.handler = function(event, context) {
    // putされたs3ファイルパス
    var path = event['Records'][0]['s3']['object']['key'];

    var params = {
        name: 'test1',
        uniqueId: '1'
    }

    // DP設定を新規作成
    dp.createPipeline(params, function(err, createResult) {
        if (err) {
            console.log(err);
            throw true;
        }

        // 新規作成されたpipelineId
        var pipelineId = createResult['pipelineId'];

        var params = {
            pipelineId: 'template-pipeline-id' // テンプレートのid
        }

        // テンプレートから定義を取得
        dp.getPipelineDefinition(params, function(err, def) {
             if (err) {
                console.log(err);
                throw true;
            }

            // テンプレート定義を変更
            def['pipelineId'] = pipelineId;

            var p_values = def['parameterValues'];

            // s3のファイルパスをputされたs3ファイルに変更
            for (var i = 0; i < p_values.length; i++) {
                if (p_values[i]['id'] == 'myInputS3Loc') {
                    // s3の場所
                    p_values[i]['stringValue'] = 's3://backet/' + path;
                }
            }

            // 新規作成したDatapipeline項目に、変更した定義を登録する
            dp.putPipelineDefinition(def, function(err) {
                if (err) {
                    console.log(err);
                    throw true;
                }

                // 実行
                dp.activatePipeline({pipelineId: pipelineId}, function(err){
                    if (err) {
                        console.log(err);
                        throw true;
                    }
                        context.succeed(true);
                })
            });
        });
    });
};

これで、対象s3バケットにcsvファイルが上がると、pipelineが作成されてrdsにロードされた。
30分に一回だと1ヶ月に 720USD = (48times * 0.5USD * 30days) くらいかかりそうなので、全然意味わからない構成です。
activateされたdatapipelineの定義を変更できると安く済むのになぁと思いました。

ちなみに、この処理結局バッチサーバーで普通にcron実装することにしました。lambdaだとエラー時にアラート送れないので。

5
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
6