2016/10/24追記:batchGetItem、batchWriteItemは一部のレコード書き込み処理などが失敗しても成功が返って来ます。特に厄介なのがreadキャパシティやwriteキャパシティを超えた場合にProvisionedThroughputExceededExceptionが発生するのですが上記メソッドでは取得できません。一件ずつしか書き込みできませんがputItem、getItemの方がエラーレスポンスが返ってくるので確実です。
readキャパシティとwriteキャパシティのオートスケールに関してはクラスメソッドさんの記事が参考になります。
http://dev.classmethod.jp/etc/auto-scaling-dynamodb-by-lambda/
DynamoDBとは
いい感じにまとまってるのがあるのでそちらを参考にしてください
DynamoDB の基礎知識とまとめ
メリットに関してざっくり言うとスケールを勝手にやってくれるので規模が大きくなっても管理が楽な点です。
DynamoDBの不満点
管理が楽と言いつつ、色々不満な点(自由度が低い点)があるのがDynamoDBです。
以下に不満点をあげてみました。
- 検索が貧弱、貧弱ゥ(キー以外の条件では全スキャンしかできない上、検索条件指定が貧弱です。)
- NoSQLとか言う割には型指定しないとデータ挿入できない、データ取得時にデータ型がついてくる
- 日付型がない
- 空文字が挿入できない
他にも色々あるのよ・・・
ここにハマった!DynamoDB
特にNoSQLといいつつ、型指定とかェ・・・
ちなみにNoSQLの代表格であるMongoDBならサーバレスアーキテクチャの弱点でもある最大コネクションもチューニングすれば20000までいくらしいです。かつ、検索条件も柔軟です。
なんでAWSはMongoDBサポートしてくれてないんだよ、ちくせう
ORマッパー的なヘルパーを作ってみる
そんなわけでJSONのリクエストパラメータをそのままぶち込み、
JSONのレスポンスパラメータとして返せるORマッパーを作ってみたのだ。
各種API参考:DynamoDB for Javascript – cheatsheet
LambdaのAPIとして作成してます。
// DynamoDB
var AWS = require("aws-sdk");
AWS.config.update({
region: "us-east-1"
});
var dynamodb = new AWS.DynamoDB();
var code = {
ErrorNetWorkUnAvaiable : -2,
ErrorRequest : -1,
Success : 0,
ErrorParamsOrLDAP : 1,
ErrorDB : 2,
ErrorSoap : 3,
ErrorSystem : 4,
ErrorLimitFamily : 5,
ErrorLoginedOnOther : 6,
ErrorFamilyInvalid : 7,
ErrorDataSource : 9,
ErrorCheckInput : 10,
ErrorUnKnown:11
};
// データ型判定
var is = module.exports.is = function (type, obj) {
var clas = Object.prototype.toString.call(obj).slice(8, -1);
return obj !== undefined && obj !== null && clas === type;
};
// データ型プレフィックス取得
var prefix = module.exports.prefix = function (data) {
var dataType = 'S';
if (is('String', data)) {
dataType = 'S';
if (data.length === 0) {
dataType = 'NULL'; // 空文字入らないのでNULL型
}
if (checkDate(data)) {
dataType = 'N'; // Date型はないので数値保存
}
}
if (is('Number', data)) {
dataType = 'N';
}
if (is('Boolean', data)) {
dataType = 'BOOL';
}
if (is('Date', data)) {
dataType = 'N'; // Date型はないので数値保存
}
if (is('Array', data)) {
dataType = 'L';
}
if (is('Object', data)) {
dataType = 'M';
}
return dataType;
};
/**
* 日付をフォーマットする
* @param {Date} date 日付
* @param {String} [format] フォーマット
* @return {String} フォーマット済み日付
*/
var formatDate = function (date, format) {
if (!format) format = "YYYY-MM-DDThh:mm:ss.SSSZ";
format = format.replace(/YYYY/g, date.getFullYear());
format = format.replace(/MM/g, ('0' + (date.getMonth() + 1)).slice(-2));
format = format.replace(/DD/g, ('0' + date.getDate()).slice(-2));
format = format.replace(/hh/g, ('0' + date.getHours()).slice(-2));
format = format.replace(/mm/g, ('0' + date.getMinutes()).slice(-2));
format = format.replace(/ss/g, ('0' + date.getSeconds()).slice(-2));
if (format.match(/S/g)) {
var milliSeconds = ('00' + date.getMilliseconds()).slice(-3);
var length = format.match(/S/g).length;
for (var i = 0; i < length; i++) format = format.replace(/S/, milliSeconds.substring(i, i + 1));
}
return format;
};
//"YYYY-MM-DD'T'hh:mm:ss.SSS'Z'"形式の日付かチェック
var checkDate = function (strDate) {
// フォーマットチェック
if (!strDate.match(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$/)) {
return false;
}
var date = strDate.split("T")[0];
var time = strDate.split("T")[1];
var dateArr = date.split("-");
if (dateArr.length < 3) {
return false;
}
var year = Number(dateArr[0]);
var month = Number(dateArr[1] - 1);
var day = Number(dateArr[2]);
var timeArr = time.split(":");
if (timeArr.length < 3) {
return false;
}
var hour = Number(timeArr[0]);
var minute = Number(timeArr[1]);
var second = Number(timeArr[2].split(".")[0]);
// 正しい日付かチェック
if (year >= 0 && month >= 0 && month <= 11 && day >= 1 && day <= 31) {
var date = new Date(year, month, day);
if (isNaN(date)) {
return false;
} else if (date.getFullYear() == year && date.getMonth() == month && date.getDate() == day) {
// 正しい時間かチェック
if (hour >= 0 && hour <= 23 && minute >= 0 && minute <= 59 && second >= 0 && second <= 59) {
return true;
}
}
}
return false;
};
// ランダム英数
var randobet = function (n) {
var a = 'abcdefghijklmnopqrstuvwxyz' + 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' + '0123456789';
a = a.split('');
var s = '';
for (var i = 0; i < n; i++) {
s += a[Math.floor(Math.random() * a.length)];
}
return s;
};
// objectId自動生成(キー)
function convertNewData(data) {
var newdata = data;
if (data.objectId === void 0) {
newdata.objectId = randobet(10);
}
return newdata;
}
function convertNewDatas(datas) {
var newdatas = [];
for (var j = 0; j < datas.length; ++j) {
newdatas.push(convertNewData(datas[j]));
}
return newdatas;
}
function createCondition(param) {
var condition = {};
for (var key in param) {
condition.ComparisonOperator = key; // 検索条件
var p = prefix(param[key]);
if (p === 'N') {
param[key] = param[key].toString();
}
if (checkDate(param[key])) {
param[key] = new Date(param[key]).getTime().toString();
console.log("param[key]:", param[key]);
}
var obj = {};
obj[p] = param[key];
condition.AttributeValueList = [obj]; // 比較対象
}
return condition;
}
// 条件文作成
function createConditions(where) {
var scanFilters = {};
for (var key in where) {
scanFilters[key] = createCondition(where[key]);
}
return scanFilters;
}
function getData(item){
for (var dataType in item) {
console.log("item:",item,"dataType:",dataType);
if (dataType == "S") {
return item[dataType];
}
if (dataType == "N") {
// 数字に変換
return Number(item[dataType]);
}
if (dataType == "BOOL") {
if (item[dataType] === true) {
return "1";
} else {
return "0";
}
}
if (dataType == "NULL") {
if (item[dataType] === true) {
// 空文字に変換
return "";
}
}
if (dataType == "M") {
var value = {};
for(var key in item[dataType]){
value[key] = getData(item[dataType][key]);
}
return value;
}
if (dataType == "L") {
var value = [];
for(var i = 0;i < item[dataType].length;++i){
value.push(getData(item[dataType][i]));
}
return value;
}
}
return null;
}
function formatData(Item) {
var data = {};
console.log("Item:",Item);
for (var key in Item) {
console.log("key:",key);
var item = Item[key];
data[key] = getData(item);
}
return data;
}
// Dynamo結果データをフォーマットする
function formatDatas(param) {
var Items = param;
var datas = [];
for (var i = 0; i < Items.length; ++i) {
var item = Items[i];
var data = formatData(item);
//console.log("data:",data);
datas.push(data);
}
console.log("datas:", JSON.stringify(datas));
return datas;
}
// データ取得(scan)
var select = module.exports.select = function (event, cb) {
var table = event.table;
var condition = {};
if (event.param.where) {
condition.where = event.param.where;
}
if (event.param.order) {
condition.order = event.param.order;
}
var params = {
TableName: table,
ScanFilter: createConditions(condition.where)
};
if (event.lastEvaluatedKey) {
console.log(event.lastEvaluatedKey);
params.ExclusiveStartKey = event.lastEvaluatedKey; // 続きがある場合はnullでないため、続きから検索するキーを代入
}
console.log("params:", JSON.stringify(params));
// 頭から全文検索(query検索でない場合はこの検索しかない)
dynamodb.scan(params, function (err, data) {
if (err) {
console.error(err);
return cb(err, {
message: 'scan failed',
status_cd: code.ErrorDB
});
} else {
console.log(data);
var formatedData = formatDatas(data.Items);
// 取得したデータを追加する
if (event.lastEvaluatedData) {
formatedData.push(event.lastEvaluatedData);
}
// 続きがある場合はlastEvaluatedKeyが取得できる
if (data.lastEvaluatedKey) {
event.lastEvaluatedData = formatedData;
event.lastEvaluatedKey = data.lastEvaluatedKey;
return select(event, cb);
}
return cb(null, {
data: formatedData,
status_cd: code.Success
});
}
});
};
// データ取得(query)
var query = module.exports.query = function (event, cb) {
var table = event.table;
var datas = event.param.data;
if (!is('Array', datas)) {
datas = [datas];
}
var Ids = [];
for(var i=0;i<datas.length;++i){
var p = prefix(datas[i].objectId);
var obj = {};
obj[p] = datas[i].objectId;
var data = {"objectId": obj};
Ids.push(data);
}
var requestItems = {};
requestItems[table] = {
Keys:Ids
};
var params = {
RequestItems: requestItems
};
console.log("params:",JSON.stringify(params));
// 取得できる上限は16MBまで
dynamodb.batchGetItem(params, function (err, data) {
if (err) {
console.error(err);
return cb(err, {
message: 'query failed',
status_cd: code.ErrorDB
});
} else {
console.log(data);
var formatedData = formatDatas(data.Responses[table]);
return cb(null, {
data: formatedData,
status_cd: code.Success
});
}
});
};
function createItem(data) {
var inputData = "";
if (is('Array', data)) {
inputData = [];
for (var inData in data) {
var value = createItem(data[inData]); // 入れ子のデータを辿る
inputData.push(value);
}
} else if (is('Object', data)) {
inputData = {};
for (var inData in data) {
var value = createItem(data[inData]); // 入れ子のデータを辿る
inputData[inData] = value;
}
} else {
inputData = data.toString();
}
if (is('String', data)) {
if (checkDate(data)) {
inputData = new Date(data).getTime().toString();
}
}
if (inputData.length === 0) {
inputData = true; // 空文字入らないのでNULL型のtrue
}
var p = prefix(data);
var obj = {};
obj[p] = inputData;
return obj;
}
// 保存データ作成
function createItems(data) {
var item = {};
item.objectId = {
"S": data.objectId
}; // 主キーパラメータ(HASHキー、RANGEキー)は必須、かつAutoIncrement機能がないのでuuid生成する
//item.range_key:{"S": "somedata"}, // 主キーパラメータ(HASHキー、RANGEキー)は必須
for (var key in data) {
if (key === 'objectId') {
continue;
}
if (key === 'delete_flag') {
continue;
}
if (key === 'createdAt') {
continue;
}
if (key === 'updatedAt') {
continue;
}
item[key] = createItem(data[key]);
}
item.delete_flag = {
"BOOL": (data.delete_flag == "1" || data.delete_flag == "true") ? true : false
};
var date = formatDate(new Date());
//console.log("date:",date);
//console.log("date:",checkDate(date));
// UNIX時間で保存
item.createdAt = {
"N": new Date(date).getTime().toString()
};
item.updatedAt = {
"N": new Date(date).getTime().toString()
};
return item;
}
function writeData(param) {
var deferred = Promise.defer();
// multi upsert(25項目までしか同時に書き込めない)
dynamodb.batchWriteItem(param, function (err, data) {
if (err) {
console.error(err);
return deferred.reject(err); // .catch
} else {
console.log(data);
}
deferred.resolve(data); // then
});
return deferred.promise;
}
// 項目作成
var upsert = module.exports.upsert = function (event, cb) {
var table = event.table;
var datas = event.param.data;
if (!is('Array', datas)) {
datas = [datas];
}
// objectIdがない場合は生成
datas = convertNewDatas(datas);
//console.log("datas:",datas);
// 25項目ずつしか同時に書き込めないので分割する
var MAX_WRITE = 25;
var count = datas.length/MAX_WRITE + 1;
var params = [];
for(var i=0;i<count;++i){
var indexStart = i * MAX_WRITE;
var indexEnd = ((i + 1) * MAX_WRITE) > datas.length ? datas.length : ((i + 1) * MAX_WRITE);
var items = [];
for (var j = indexStart; j < indexEnd; ++j) {
var obj = {
PutRequest: {
Item: createItems(datas[j])
}
};
//console.log("obj:",JSON.stringify(obj));
items.push(obj);
}
if(items.length === 0){
break;
}
var requestItems = {};
requestItems[table] = items;
var param = {
RequestItems: requestItems
};
console.log("params:", JSON.stringify(param));
console.log("--------------------",i,"---------------------");
params.push(param);
}
return params.reduce(function (promise, param) {
return promise.then(function () {
return writeData(param);
});
}, Promise.resolve())
.then(function(data){
return query(event,cb);
})
.catch(function(err){
console.error(err);
return cb(err, {
message: 'upsert failed',
status_cd: code.ErrorDB
});
});
};
// データ保存
module.exports.save = function (event, cb) {
var table = event.table;
var datas = event.param.data;
// テーブル一覧取得
dynamodb.listTables(function (err, data) {
console.log(data.TableNames);
var params = {
TableName: table,
// 主キーの設定
KeySchema: [
{
AttributeName: "objectId",
KeyType: "HASH"
}, //HASH key
//{ AttributeName: "range_key", KeyType: "RANGE" } //RANGE key 必要ない場合は設定しない
],
AttributeDefinitions: [
{
AttributeName: "objectId",
AttributeType: "S"
},
//{ AttributeName: "range_key", AttributeType: "S" }
],
// 読み込み書き込みキャパシティ
ProvisionedThroughput: {
ReadCapacityUnits: 1,
WriteCapacityUnits: 1
}
};
// 存在しているかのフラグ
var isCreated = false;
for (var i = 0; i < data.TableNames.length; ++i) {
if (data.TableNames[i] == table) {
isCreated = true;
}
}
// すでに作成済み
if (isCreated) {
// データ保存
return upsert(event, cb);
} else {
// テーブル作成
dynamodb.createTable(params, function (err, data) {
if (err) {
console.error(err);
return cb(err, {
message: 'table create failed',
status_cd: code.ErrorDB
});
} else {
console.log(data);
var params = {
TableName: table
};
// テーブル作成を待つ
dynamodb.waitFor('tableExists', params, function (err, data) {
if (err) {
console.error(err);
} else {
console.log(data);
}
// データ保存
return upsert(event, cb);
});
}
});
}
});
};
save関数を呼び出すLambdaです。
テーブルがない場合はテーブルを作成して項目をupsertします。
objectIdをキーとして作成します。
'use strict';
var dynamo = require('dynamo');
module.exports.handler = function(event, context) {
return dynamo.save(event, function(error, response) {
return context.done(error, response);
});
};
select関数を呼び出すLambdaです。
'use strict';
var dynamo = require('dynamo');
module.exports.handler = function(event, context, cb) {
return dynamo.select(event, function(error, response) {
return context.done(error, response);
});
};
実行例
save関数の入力データには次のようなデータを入力するとします。
日時はUnix時間で格納します。(日付型がないため、値比較で検索するために)
空文字はNULLに変換します。
Object型やArray型のデータも自動的にデータ型を指定して、保存できるようにしています。
{
"table": "Test",
"param": {
"data":[ {
"closeDate": 1482918300000,
"image": {
"url": "http://xxxx",
"type": "File",
"inner":["a","b",{"test1":"l"}]
},
"test":["a","b","c"],
"objectId": "6AaY8G9MUA",
"startDate": 1459138980000,
"text": "テストですあああ",
"userType": 1
},{
"closeDate": 1482918300000,
"image": {
"url": "http://yyyy",
"type": "File",
"inner":["a","b",{"test2":"l"}]
},
"test":["a","b","c"],
"objectId": "6AaY8G9MUI",
"startDate": 1459138980000,
"text": "テストですいいいいい",
"userType": 0
}]
}
}
実行結果
$ sls function run
Serverless: Running dynamoSave...
[ 'FBbot-ifeel',
'LineBot',
'Test',
'appStartupLog',
'appVersion',
'authtest-myAuthenticationProject-cache',
'ifeel-News',
'ifeel-PopUp',
'ifeel-Product',
'ifeel-Question',
'ifeel-SideMenu' ]
params: {"RequestItems":{"Test":[{"PutRequest":{"Item":{"objectId":{"S":"6AaY8G9MUA"},"closeDate":{"N":"1482918300000"},"image":{"M":{"url":{"S":"http://xxxx"},"type":{"S":"File"},"inner":{"L":[{"S":"a"},{"S":"b"},{"M":{"test1":{"S":"l"}}}]}}},"test":{"L":[{"S":"a"},{"S":"b"},{"S":"c"}]},"startDate":{"N":"1459138980000"},"text":{"S":"テストですあああ"},"userType":{"N":"1"},"delete_flag":{"BOOL":false},"createdAt":{"N":"1470341935400"},"updatedAt":{"N":"1470341935400"}}}},{"PutRequest":{"Item":{"objectId":{"S":"6AaY8G9MUI"},"closeDate":{"N":"1482918300000"},"image":{"M":{"url":{"S":"http://yyyy"},"type":{"S":"File"},"inner":{"L":[{"S":"a"},{"S":"b"},{"M":{"test2":{"S":"l"}}}]}}},"test":{"L":[{"S":"a"},{"S":"b"},{"S":"c"}]},"startDate":{"N":"1459138980000"},"text":{"S":"テストですいいいいい"},"userType":{"N":"0"},"delete_flag":{"BOOL":false},"createdAt":{"N":"1470341935401"},"updatedAt":{"N":"1470341935401"}}}}]}}
{ UnprocessedItems: {} }
params: {"RequestItems":{"Test":{"Keys":[{"objectId":{"S":"6AaY8G9MUA"}},{"objectId":{"S":"6AaY8G9MUI"}}]}}}
{ Responses: { Test: [ [Object], [Object] ] },
UnprocessedKeys: {} }
datas: [{"text":"テストですいいいいい","startDate":1459138980000,"closeDate":1482918300000,"test":["a","b","c"],"objectId":"6AaY8G9MUI","createdAt":1470341935401,"delete_flag":"0","image":{"inner":["a","b",{"test2":"l"}],"type":"File","url":"http://yyyy"},"userType":0,"updatedAt":1470341935401},{"text":"テストですあああ","startDate":1459138980000,"closeDate":1482918300000,"test":["a","b","c"],"objectId":"6AaY8G9MUA","createdAt":1470341935400,"delete_flag":"0","image":{"inner":["a","b",{"test1":"l"}],"type":"File","url":"http://xxxx"},"userType":1,"updatedAt":1470341935400}]
Serverless: -----------------
Serverless: Success! - This Response Was Returned:
Serverless: {
"data": [
{
"text": "テストですいいいいい",
"startDate": 1459138980000,
"closeDate": 1482918300000,
"test": [
"a",
"b",
"c"
],
"objectId": "6AaY8G9MUI",
"createdAt": 1470341935401,
"delete_flag": "0",
"image": {
"inner": [
"a",
"b",
{
"test2": "l"
}
],
"type": "File",
"url": "http://yyyy"
},
"userType": 0,
"updatedAt": 1470341935401
},
{
"text": "テストですあああ",
"startDate": 1459138980000,
"closeDate": 1482918300000,
"test": [
"a",
"b",
"c"
],
"objectId": "6AaY8G9MUA",
"createdAt": 1470341935400,
"delete_flag": "0",
"image": {
"inner": [
"a",
"b",
{
"test1": "l"
}
],
"type": "File",
"url": "http://xxxx"
},
"userType": 1,
"updatedAt": 1470341935400
}
],
"status_cd": 0
}
次にselect関数でデータを取得します。
whereの中のデータはDynamoDBの検索条件でscan検索してます。
{
"table": "Test",
"param": {
"where": {
"startDate": {
"LT": 1468391572000
},
"closeDate": {
"GT": 1468391572000
},
"userType":{
"EQ":1
}
}
}
}
実行結果はつぎのようになります。
$ sls function run
Serverless: Running dynamoSelect...
params: {"TableName":"Test","ScanFilter":{"startDate":{"ComparisonOperator":"LT","AttributeValueList":[{"N":"1468391572000"}]},"closeDate":{"ComparisonOperator":"GT","AttributeValueList":[{"N":"1468391572000"}]},"userType":{"ComparisonOperator":"EQ","AttributeValueList":[{"N":"1"}]}}}
{"Items":[{"text":{"S":"テストですあああ"},"startDate":{"N":"1459138980000"},"closeDate":{"N":"1482918300000"},"test":{"L":[{"S":"a"},{"S":"b"},{"S":"c"}]},"objectId":{"S":"6AaY8G9MUA"},"createdAt":{"N":"1468954205204"},"delete_flag":{"BOOL":false},"image":{"M":{"inner":{"L":[{"S":"a"},{"S":"b"},{"M":{"test1":{"S":"l"}}}]},"type":{"S":"File"},"url":{"S":"http://xxxx"}}},"userType":{"N":"1"},"updatedAt":{"N":"1468954205204"}}],"Count":1,"ScannedCount":2}
datas: [{"text":"テストですあああ","startDate":1459138980000,"closeDate":1482918300000,"test":["a","b","c"],"objectId":"6AaY8G9MUA","createdAt":1468954205204,"delete_flag":"0","image":{"inner":["a","b",{"test1":"l"}],"type":"File","url":"http://xxxx"},"userType":1,"updatedAt":1468954205204}]
Serverless: -----------------
Serverless: Success! - This Response Was Returned:
Serverless: {
"data": [
{
"text": "テストですあああ",
"startDate": 1459138980000,
"closeDate": 1482918300000,
"test": [
"a",
"b",
"c"
],
"objectId": "6AaY8G9MUA",
"createdAt": 1468954205204,
"delete_flag": "0",
"image": {
"inner": [
"a",
"b",
{
"test1": "l"
}
],
"type": "File",
"url": "http://xxxx"
},
"userType": 1,
"updatedAt": 1468954205204
}
],
"status_cd": 0
}
素直なJSON形式で返せました。
パフォーマンスは?
全スキャンでも並列処理で検索かければ、パフォーマンスがでるようです。
手っ取り早く速度あげたい場合は料金はあがりますが、設定で下記のテーブルのユニット数を増やせば良いでしょう。
- 読み込み容量ユニット
- 書き込み容量ユニット
複数テーブル書き込みのトランザクションは?
Dynamoストリームの機能を使えば、テーブル書き込み時に連動してさらにLambda関数をキックできるようです。
これを使えば複数テーブルに一括処理させることは可能になるでしょう。
(今回はここまでやっていません。てかロールバックとかどうなるんだろ?)