AWS のマネージドサービスを連携する Lambda や サービスを Node.js の Stream を使って作ることが多いため、利用している自作モジュールについて説明します。
kinesis-stream-lambda
- Kinesis Stream のイベントソースの Record をパースして data フィールドを Base64 から
Buffer にしてくれる ReadableStream (KPL の aggregation にも対応可能) - その Buffer を JSON としてパースし、JavaScript Plain Object のデータに Transform するストリーム
それぞれを提供します。要するに Lambda の event をコンストラクタ渡して2つ pipe で繋げると中身の JSON が Object で Stream 処理できる代物です。
const es = require('event-stream');
const KSL = require('kinesis-stream-lambda');
exports.handler = function(event, context, callback) {
console.log('event: ', JSON.stringify(event, null, 2));
const result = [];
const stream = KSL.reader(event, { isAgg: false });
stream.on('end', function() {
console.dir(result);
callback(null, null);
});
stream.on('error', function(err) {
callback(err);
});
stream
.pipe(KSL.parseJSON({ expandArray: false }))
.pipe(es.map(function(data, callback) {
result.push(data);
callback(null, data)
}));
}
s3-block-read-stream
S3 からファイルの中身を Range で取得しつつ Stream で処理できます。通常の Stream だと後方のストリームが流量が少ない場合に S3 の HTTP レスポンス処理が長時間になって TCP 接続が双方で Write/ReadTimeout しまう懸念があります。但しブロックサイズが細かいと API のコール回数が増えるのでそこは注意が必要です(APIのコール回数は従量課金対象です)。
paced-work-stream
指定した並列数かつ一定間隔で処理できるワーカーのような Transform ベースの Stream を提供します。
主に DynamoDB への IO 処理や毎秒の呼び出し回数に制限がある API を呼び出すときに使えます。
fast-paced work にするのか slow-paced work にするかはコンストラクタの第1引数の concurrency
と workMS
で調節します。
実際の処理はコンストラクタの第2引数にもしくは、サブクラスに _workPromise
メソッド を定義します。この関数は処理内容を Promise で定義してその Promise を返す Function にします(これは Promise は定義した瞬間から実行が始まるためです)。なお Array<Function>
として返すことも可能で、その場合でもそれを解釈して同時実行数を調節します。あと、処理数を tag でカウントできる機能も付いています。
dynamo-processor
DynamoDB への操作を JSON で定義して食わせると CRUD 処理が簡単に実行できるプロセッサモジュールです。
paced-work-stream と組ませて RCU/WCU を意識した汎用的な処理を実装できます。
class DynamoWorkStream extends PacedWorkStream {
constructor() {
super({
concurrency: 10,
workMS: 80
});
}
_workPromise(data) {
return dp.proc(data, {
table: data.table,
useBatch: false
});
}
}
module.exports = DynamoWorkStream;
promised-lifestream
AWSに限らずですが、複数の stream を pipe で繋げていくと、いずれかの stream で起きたエラーを補足するためには、それぞれの stream に .on('error', <function>)
を定義する必要があります。またこれを Promise 化してすっきりさせたい。そのための関数を提供するモジュールです。needResult: true
でオプション指定すると .then(result => ...)
と最後の結果を受け取ることも可能です。
const PromisedLifestream = require('promised-lifestream');
exports.handle = function(event, context, callback) {
const workStream = new DynamoWorkStream();
PromisedLifestream([
KSL.reader(event, { isAgg: false }),
KSL.parseJSON({ expandArray: true }),
workStream
])
.then(() => {
callback(null, null);
})
.catch(err => {
callback(err); // 3つの stream どこかでエラーが起きてもここでキャッチできる
});
}
補足
実装例と組み合わせるべきモジュール
- Kinesis Stream から命令データに従って DynamoDB を一定速度更新する Lambda
kinesis-stream-lambda, paced-work-stream, dynamo-processor
参考)DynamoDBに一定ペースで更新処理をしてたのにWCUを超えてしまった - S3 へのファイル Put をトリガーに起動する S3 の CSV ファイルを読みつつ DynamoDB を一定速度更新する Lambda
s3-block-read-stream, paced-work-stream, dynamo-processor, csv - ローカルファイルの CSV を読みつつ DynamoDB を一定速度更新するスクリプト
paced-work-stream, dynamo-processor, csv - CloudWatch LogsをKinesis Stream経由でLambdaからLogentriesに転送する kinesis-stream-lambda
promised-lifestream は全体的に使えます。