はじめに
今年もRe:Inventで様々なサービスが発表されましたね!
個人的にはDynamoDBのバックアップ&リストアとLambdaのGo対応が気になるところです。
まぁ前者は東京リージョンの対応しておらず、後者はまだ実装されていないという状態ですが…
さて本題です。
私は、今まで(約2年間)Lambdaとその他のサービスを組み合わせていろんなことをしてきて、一旦まとめてみたいので記事にしました
最近はIoTサービスの開発を主にせこせこコーディングしています
Lambdaは本当に便利なサービスです!個人的にはかなり好きなサービスです
基本的にNode.jsでコーディングしているプログラマーなので、記事に書くソースコードはJavaScript(Node.js)です
KinesisStreams → Lambda
IoTサービスでセンサーなどから送信されてくるストリームデータを扱うときに欠かせないサービスとなっているKinesisStreams(個人的見解)
基本的にデータの受け口となって、シャードという単位に処理を分散して、各シャードごとにLambdaを実行してくれます
プライマリーキーごとにシャードが振り分けられるので、同じセンサー(IDが一緒など)から送信されたデータが別シャードに振り分けられることはありません
KinesisStreamからはバッチサイズで指定したデータ量でLambdaのhandlerにeventとして渡されます
一度に数レコード分渡されることもありますので、event["Records"]
の中身をfor文で回すなどして、一つずつ対応する必要があります
また、KinesisStreamsを経由するとデータがBase64でエンコードされているので、デコードするのをお忘れなく
exports.handler = (event, context, callback) => {
for (let i = 0; i < event["Records"].length; i++){
const payload = event["Records"][i]["kinesis"]["data"];
const decodePayload = new Buffer(payload, "base64").toString("utf8");
const data = JSON.parse(decodePayload);
// あとは煮るなり焼くなり…
}
}
Lambda + DynamoDB
DynamoDBを利用するわけは、NoSQLのデータベースなので、ストリームデータのようにバンバンデータを突っ込むのに適していたり、Lambdaからデータを保存するのにRDBだとVPC周りが辛いなどの理由があります
KinesisStreamsから受け取ったデータをDynamoDBに保存したり、あるテーブルにデータが保存されたのをトリガーにLambdaを実行して、別テーブルに書き込みをしたいときに使います
Lambda → DynamoDB
DynamoDBにJSON文字列をPutする
"use strict";
const AWS = require("aws-sdk");
const dynamodb = new AWS.DynamoDB.DocumentClient({
region: "ap-northeast-1"
});
exports.handler = (event, context, callback) => {
const jsonFile = {
id: "12345",
value: "1111"
};
const params = {
TableName: "sample",
Item: jsonFile
};
console.log(params);
dynamodb.put(params).promise().then(() => {
callback(null);
}).catch((err) => {
console.error(err);
callback(err);
});
};
DynamoDB(DynamoDBStream) → Lambda
複数テーブルが存在していて、あるテーブルにデータが保存されたら、そのデータを元に他のテーブルのデータを操作したいなーってときに使います
KinesisStreamsのような挙動をしてくれますが、シャード数をこちら側で設定できないのが惜しい…
DynamoDBStreamsの設定は、各テーブル詳細画面のトリガータブから設定することができます
DynamoDBStreamからLambdaのhandlerに渡されるデータをこんな感じ
{
"Records": [
{
"eventID": "719b7f0326a65cb1a2ed2f4053ba9e05",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "ap-northeast-1",
"dynamodb": {
"ApproximateCreationDateTime": 1512016440,
"Keys": {
"id": {
"S": "stream_id"
}
},
"NewImage": {
"id": {
"S": "stream_id"
},
"value": {
"S": "stream_value"
}
},
"SequenceNumber": "26000000000015831839590",
"SizeBytes": 39,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "eventSourceARNString"
}
]
}
Streamsから渡されるデータはdynamodb
の中に入っています
eventName
にDynamoDBに対してどのような操作が行われたのかが入っているので、これを元にこの後の操作を分岐させたり、フィルターをかけたりすることができます
Lambda + S3
KinesisStreamsから受け取ったデータ(基本的にJSON形式)をS3に保存したいなーとか、S3に保存されたのをトリガーにLambdaを実行したいなーってときに使います
ただ、バケットポリシーを設定しないと、他のサービスから操作(GetやPutなど)することができないので、適切なバケットポリシーを設定してください(デフォルトは全ての操作がDenyになっています)
Lambda → S3
LambdaからS3にJSONファイルを保存する
"use strict";
const AWS = require("aws-sdk");
const S3 = new AWS.S3();
exports.handler = (event, context, callback) => {
const jsonFile = [{
id: "12345",
value: "1111"
},{
id: "67890",
value: "2222"
}];
const params = {
Bucket: "BucketName",
Key: "sample.json",
Body: JSON.stringify(jsonFile)
}
S3.putObject(params, (err, data) => {
if (err) {
console.error(err);
callback(err);
} else {
console.log(data);
callback(null);
}
});
};
S3 → Lambda
S3にオブジェクトが保存されたときにLambdaをキックする
対象のS3バケットにこのように設定しまーす
設定内容は
- Bucketに対してPutが実行された
- PutされたKeyのプレフィックスが「sample/」
- PutされたKeyのサフィックスが「.json」
- 「sampleFromS3」というLambda関数をinvokeする
といった設定になってます
invokeされたLambda内でhandlerに渡されるeventの中身をlogに吐いてみると
{
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "ap-northeast-1",
"eventTime": "2017-11-30T00:42:00.663Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "AWS:xxxxxxxxxxx:sampleTest"
},
"requestParameters": {
"sourceIPAddress": "ipaddress"
},
"responseElements": {
"x-amz-request-id": "1234567890",
"x-amz-id-2": "xxxxxxxxxxxxxxxx"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "invokeLambda",
"bucket": {
"name": "bucketName",
"ownerIdentity": {
"principalId": "xxxxxxxxxx"
},
"arn": "arn:aws:s3:::bucketName"
},
"object": {
"key": "sample/sample.json",
"size": 61,
"eTag": "a7bd25a83b170eec66356c4f26fc2d2e",
"sequencer": "005A1F53D89E374E61"
}
}
}
]
}
といった内容がLambdaに渡されます
この場合はPutされたバケット名とオブジェクトのKey情報が渡されています
オブジェクトの中身が渡される訳ではないのでご注意ください
もし、オブジェクトの中身を取得したいのであれば、PutされたオブジェクトをGetします
const buketName = event["Records"][0]["s3"]["bucket"]["name"];
const key = event["Records"][0]["s3"]["object"]["key"];
const params = {
Bucket: bucketName,
Key: key
}
s3.getObject(params, (err, data) => {
if (err) {
console.error(err);
callback(err);
} else {
console.log(data);
callback(null);
}
});
Lambda → Lambda
Lambdaで並列処理を実行させたい!って場合によく使います
要するに親Lambdaが呼ばれるたびに子Lambdaを生成するんです
そうすれば子Lambdaの処理が終わるのを待たずに新しい子Lambdaで処理を実行することができるんです
ただ、実行順序の整合性が必要な場合は注意が必要です
"use strict";
const AWS = require("aws-sdk");
const lambda = new AWS.Lambda();
module.exports = (event, context, callback) => {
const Item = ""; // Lambdaに渡したいデータ
lambda.invoke({
FunctionName: "LambdaFunctionName",
Payload: Item
}).promise().then(() => {
callback(null);
}).catch((err) => {
console.error(err);
callback(err);
});
}
まとめ
このようにLambdaは様々なサービスと組み合わせることによって、いろんなことができます
しかも基本的に料金はかかりません。というかそんな簡単に料金が発生するような料金設定じゃないですw
ただ、いくつかの制限もあったりするので、使い始める際には注意が必要です
例えば、タイムアウト時間が最大で3分までしか設定できないので、単純に言うと、3分以上かかる処理は実質Lambdaで処理できません。そういう時は素直にEC2を立てましょう。それか最後で書いたLambdaからLambdaを実行する方法で処理を繋いでいくという手もあります
ともあれ、ほんとにLambdaはとても便利なサービスです!(2回目)
ではまた!