Kinesis Stream, Lambda, DynamoDB, S3 で Stream ベース実装に使える npm モジュール

  • 2
    いいね
  • 0
    コメント

AWS のマネージドサービスを連携する Lambda や サービスを Node.js の Stream を使って作ることが多いため、利用している自作モジュールについて説明します。

kinesis-stream-lambda

https://github.com/tilfin/kinesis-stream-lambda

  • Kinesis Stream のイベントソースの Record をパースして data フィールドを Base64 から Buffer にしてくれる ReadableStream (KPL の aggregation にも対応可能)
  • その Buffer を JSON としてパースし、JavaScript Plain Object のデータに Transform するストリーム

それぞれを提供します。要するに Lambda の event をコンストラクタ渡して2つ pipe で繋げると中身の JSON が Object で Stream 処理できる代物です。

const es = require('event-stream');
const KSL = require('kinesis-stream-lambda');

exports.handler = function(event, context, callback) {
  console.log('event: ', JSON.stringify(event, null, 2));

  const result = [];
  const stream = KSL.reader(event, { isAgg: false });

  stream.on('end', function() {
    console.dir(result);
    callback(null, null);
  });

  stream.on('error', function(err) {
    callback(err);
  });

  stream
  .pipe(KSL.parseJSON({ expandArray: false }))
  .pipe(es.map(function(data, callback) {
    result.push(data);
    callback(null, data)
  }));
}

s3-block-read-stream

https://github.com/tilfin/s3-block-read-stream

S3 からファイルの中身を Range で取得しつつ Stream で処理できます。通常の Stream だと後方のストリームが流量が少ない場合に S3 の HTTP レスポンス処理が長時間になって TCP 接続が双方で Write/ReadTimeout しまう懸念があります。但しブロックサイズが細かいと API のコール回数が増えるのでそこは注意が必要です(APIのコール回数は従量課金対象です)。

paced-work-stream

https://github.com/tilfin/paced-work-stream

指定した並列数かつ一定間隔で処理できるワーカーのような Transform ベースの Stream を提供します。
主に DynamoDB への IO 処理や毎秒の呼び出し回数に制限がある API を呼び出すときに使えます。
fast-paced work にするのか slow-paced work にするかはコンストラクタの第1引数の concurrencyworkMS で調節します。

実際の処理はコンストラクタの第2引数にもしくは、サブクラスに _workPromise メソッド を定義します。この関数は処理内容を Promise で定義してその Promise を返す Function にします(これは Promise は定義した瞬間から実行が始まるためです)。なお Array<Function> として返すことも可能で、その場合でもそれを解釈して同時実行数を調節します。あと、処理数を tag でカウントできる機能も付いています。

dynamo-processor

https://github.com/tilfin/dynamo-processor

DynamoDB への操作を JSON で定義して食わせると CRUD 処理が簡単に実行できるプロセッサモジュールです。
paced-work-stream と組ませて RCU/WCU を意識した汎用的な処理を実装できます。

class DynamoWorkStream extends PacedWorkStream {
  constructor() {
    super({
      concurrency: 10,
      workMS: 80
    });
  }

  _workPromise(data) {
    return dp.proc(data, {
               table: data.table,
               useBatch: false
             });
  }
}

module.exports = DynamoWorkStream;

promised-lifestream

https://github.com/tilfin/promised-lifestream

AWSに限らずですが、複数の stream を pipe で繋げていくと、いずれかの stream で起きたエラーを補足するためには、それぞれの stream に .on('error', <function>) を定義する必要があります。またこれを Promise 化してすっきりさせたい。そのための関数を提供するモジュールです。needResult: true でオプション指定すると .then(result => ...) と最後の結果を受け取ることも可能です。

const PromisedLifestream = require('promised-lifestream');

exports.handle = function(event, context, callback) {
  const workStream = new DynamoWorkStream();
  PromisedLifestream([
    KSL.reader(event, { isAgg: false }),
    KSL.parseJSON({ expandArray: true }),
    workStream
  ])
  .then(() => {
    callback(null, null);
  })
  .catch(err => {
    callback(err); // 3つの stream どこかでエラーが起きてもここでキャッチできる
  });
}

補足

実装例と組み合わせるべきモジュール

promised-lifestream は全体的に使えます。

その他便利な Stream モジュール