最近AWSを使い始めた初心者です。
今回は、時間がかかりそうな処理をLambdaにやらせる場合に使った方法を備忘録的に書いてます。
使える場所は限られそうですが、1件あたりの処理なら問題ないけど、それが大量にある場合などに使えるかもしれません。
例として、S3の特定のディレクトリ配下にある各オブジェクトに対して何か処理を行うようなものをやってみます。
なお、必要なIAMなどは別途設定する必要がありますが、ここでは割愛します。
全体の流れ
- LambdaでS3からオブジェクトの一覧を取得する。
- オブジェクトのキーを小分けにしてDynamoDBに登録する。
- DynamoDBに設定したトリガーでLambdaを起動し、各オブジェクトに対して処理を行う。
必要なもの
- 処理対象となるオブジェクトを取得しDynamoDBに登録するLambda関数(RegisterObjKey)
- トリガーから起動され、実際に処理を行うLambda関数(ExecSomething)
- 処理対象となるオブジェクトのキーを登録するDynamoDBのテーブル(ObjKeyQueue)
- オブジェクトが登録されているS3(Bucket:hogehoge,対象ディレクトリ:fuga)
RegisterObjKey
'use strict';
const AWS = require('aws-sdk');
const s3 = new AWS.S3({apiVersion:'2006-03-01'});
const dynamoDB = new AWS.DynamoDB({apiVersion:'2012-08-10'});
const firstToken = "firstToken";
// 初期処理
const init = (event, context) => {
return new Promise((resolve, reject) => {
const stash = {
contentsList : [],
objList : []
};
});
};
// S3からオブジェクト一覧取得
const getObjList = (stash) => {
return new Promise((resolve, reject) => {
// オブジェクトは1回で1000個までしか取得できないため再帰的に呼び出す
const getObjListSub = (nextToken) => {
return new Promise((resolve, reject) => {
// 最後まで取得したら終了
if (!nextToken) {
resolve();
return;
}
// S3から取得する際のパラメータ
const param = {
Bucket : "hogehoge",
Prefix : "fuga/",
MaxKeys : 100 // 1回に取得する件数、用途に合わせて適時変更
};
// 次の読み込みがある場合は設定
if (nextToken !== firstToken) {
param['ContinuationToken'] = nextToken;
}
// 取得
s3.listObjectV2(param, (err, data) => {
if (err) {
// エラー時処理
console.log(err, err.stack);
reject(err);
return;
}
// とりあえずリストに追加(この辺は用途にあわせて必要な情報のみ保持したほうがメモリ的によいかも)
Array.prototype.push.apply(stash.contentsList, data.Contents);
// 再起的呼び出し
getObjListSub(data.NextContinuationToken).then(resolve).catch(reject);
});
});
};
// 最初の呼び出し
getObjListSub(firstToken).then(resolve.bind(null, stash)).catch(reject);
});
};
// 対象のリストを小分けにする
// ここで対象全オブジェクトを1回に処理できる個数に小分けにして配列に格納します。
const makeObjList = (stash) => {
return new Promise((resolve, reject) => {
let cnt = 0;
let objKeyList[];
for (let contentsData of stash.contentsList) {
objKeyList.push(contentsData.Key);
// ここの10が1回に処理できる個数。用途に合わせて変更。
if (++cnt >= 10) {
stash.objList.push(objKeyList);
objKeyList = [];
cnt = 0;
}
}
resolve(stash);
});
};
// DynamoDBに登録
const registerObjList = (stash) => {
// キー項目に使用するプレフィックス
const keyPrefix = "obj_" + Date.now() + "_";
return new Promise((resolve, reject) => {
// batchWriteItem使えば1回でできそうだけど、今回は1個ずつ登録する
const registerObjListSub(cnt, objList) => {
return new Promise((resolve, reject) => {
// 全て登録したら終了
if (objList.length <= 0) {
resolve();
return;
}
// 登録するパラメータ
const param = {
Item : {
"Key" : {S: keyPrefix * cnt},
"objList" : {SS: objList.shift()}
},
TableName : "ObjKeyQueue"
};
// 登録
dynamoDB.putItem(param, (err, data) => {
if (err) {
// エラー時処理
console.log(err, err.stack);
reject(err);
return;
}
// 次を登録
registerObjListSub(++cnt, objList).then(resolve).catch(reject);
});
});
};
// 最初の呼び出し
registerObjListSub(0, stash.data.objList).then(resolve.bind(null, stash)).catch(reject);
});
};
exports.handler = (event, context, callback) => {
init(event, context)
.then(getObjList)
.then(makeObjList)
.then(registerObjList)
.then(callback.bind(null, null, "Success."))
.catch(callback);
};
ExecSomething
'use strict';
const AWS = require('aws-sdk');
const s3 = new AWS.S3({apiVersion: '2006-03-01'});
const dynamoDB = new AWS.DynamoDB({apiVersion: '2012-08-10'});
// 初期処理
const init = (event, context) => {
return new Promise((resolve, reject) => {
// DBに登録されると、内容がeventに格納されてこのLambda関数が呼ばれる
// だたし、登録以外(更新や削除)でも呼ばれるため、登録のみ処理を行う
let objList = [];
let dbKeys = [];
event.Records.forEach((record) => {
if (record.eventName === 'INSERT') {
try {
// 追加された行のobjListの値を取得
objList.push(record.dynamodb.NewImage.objList.SS);
// 追加された行のKeyの値を取得
dbKeys.push(record.dynamodb.Keys);
} catch(e) {
// キーのリストが登録されていない場合は何もしない
}
}
});
const stash = {
recordKeys : dbKeys,
objList : objList
};
resolve(stash);
});
};
// ここで何か処理をする。今回はS3からオブジェクトを取得してるだけ
const doSomething = (stash) => {
return new Promise((resolve, reject) => {
const doSomethingSub = (objList) => {
return new Promise((resolve, reject) => {
// 全て処理したら終了
if (objList.length <= 0) {
resolve();
return;
}
// S3から取得
const param = {
Bucket : "hogehoge",
Key : objList.shift()
};
s3.getObject(param, (err, data) => {
if (err) {
// エラー処理
console.log(err, err.stack);
reject(err);
return;
}
// ここでゴニョゴニョする。
// 次のオブジェクト取得
doSomethingSub(objList).then(resolve).catch(reject);
});
});
};
// 最初の呼び出し
doSomethingSub(stash.objList).then(resolve.bind(null, stash)).catch(reject);
});
};
// DynamoDBからレコード削除
const deleteFromDynamoDB = (stash) => {
return new Promise((resolve, reject) => {
param = {
'ObjKeyQueue' : []
};
for (let dbKey of stash.recordKeys) {
param['ObjKeyQueue'].push({
DeleteRequest : {
Key : {'Key' : {S : dbKey}}
}
});
}
dynamoDB.batchWriteItem(param, (err, data) => {
if (err) {
// エラー時処理
console.log(err, err.stack);
reject(err);
return;
}
resolve(stash);
});
});
};
exports.handler = (event, context, callback) => {
init(event, context)
.then(doSomething)
.then(deleteFromDynamoDB)
.then(callback.bind(null, null, "success."))
.catch(callback);
};
ObjKeyQueue
- テーブルは以下の感じで作ります。
- テーブル名:ObjKeyQueue
- プライマリパーティションキー:Key(文字列)
- あとは良しなに
- 作成したテーブルの概要タブで「ストリームの管理」を押下し、「新旧イメージ」or「新しいイメージ」を選択します。
- トリガータブで「トリガーの作成」を押下し「既存のLambda関数」を選択し、以下の感じで作成します。
- 機能:ExecSomething
- バッチサイズ:1
- トリガーの有効化:チェックを外す
- RegistObjKeyを実行して、目的のオブジェクトのキーが登録されているのを確認できたら、トリガーを有効にする。
その他
これを定期的に実行させたい場合は、RegisterObjKeyをCloudWatchで定期的に呼び出せば行けそうです。
もう少し簡単な方法がありそうな気がします・・・が、こんな感じでやってみました。
気になる点としては、件数が多い場合はRegisterObjKeyにかなり時間がかかってしまいそうなところで、そもそも件数が少ないのであればこんなことやらずに普通にやればいいじゃん!ってところですかね。