Help us understand the problem. What is going on with this article?

Kinesis Stream, Lambda, DynamoDB, S3 で Stream ベース実装に使える npm モジュール

More than 3 years have passed since last update.

AWS のマネージドサービスを連携する Lambda や サービスを Node.js の Stream を使って作ることが多いため、利用している自作モジュールについて説明します。

kinesis-stream-lambda

https://github.com/tilfin/kinesis-stream-lambda

  • Kinesis Stream のイベントソースの Record をパースして data フィールドを Base64 から Buffer にしてくれる ReadableStream (KPL の aggregation にも対応可能)
  • その Buffer を JSON としてパースし、JavaScript Plain Object のデータに Transform するストリーム

それぞれを提供します。要するに Lambda の event をコンストラクタ渡して2つ pipe で繋げると中身の JSON が Object で Stream 処理できる代物です。

const es = require('event-stream');
const KSL = require('kinesis-stream-lambda');

exports.handler = function(event, context, callback) {
  console.log('event: ', JSON.stringify(event, null, 2));

  const result = [];
  const stream = KSL.reader(event, { isAgg: false });

  stream.on('end', function() {
    console.dir(result);
    callback(null, null);
  });

  stream.on('error', function(err) {
    callback(err);
  });

  stream
  .pipe(KSL.parseJSON({ expandArray: false }))
  .pipe(es.map(function(data, callback) {
    result.push(data);
    callback(null, data)
  }));
}

s3-block-read-stream

https://github.com/tilfin/s3-block-read-stream

S3 からファイルの中身を Range で取得しつつ Stream で処理できます。通常の Stream だと後方のストリームが流量が少ない場合に S3 の HTTP レスポンス処理が長時間になって TCP 接続が双方で Write/ReadTimeout しまう懸念があります。但しブロックサイズが細かいと API のコール回数が増えるのでそこは注意が必要です(APIのコール回数は従量課金対象です)。

paced-work-stream

https://github.com/tilfin/paced-work-stream

指定した並列数かつ一定間隔で処理できるワーカーのような Transform ベースの Stream を提供します。
主に DynamoDB への IO 処理や毎秒の呼び出し回数に制限がある API を呼び出すときに使えます。
fast-paced work にするのか slow-paced work にするかはコンストラクタの第1引数の concurrencyworkMS で調節します。

実際の処理はコンストラクタの第2引数にもしくは、サブクラスに _workPromise メソッド を定義します。この関数は処理内容を Promise で定義してその Promise を返す Function にします(これは Promise は定義した瞬間から実行が始まるためです)。なお Array<Function> として返すことも可能で、その場合でもそれを解釈して同時実行数を調節します。あと、処理数を tag でカウントできる機能も付いています。

dynamo-processor

https://github.com/tilfin/dynamo-processor

DynamoDB への操作を JSON で定義して食わせると CRUD 処理が簡単に実行できるプロセッサモジュールです。
paced-work-stream と組ませて RCU/WCU を意識した汎用的な処理を実装できます。

class DynamoWorkStream extends PacedWorkStream {
  constructor() {
    super({
      concurrency: 10,
      workMS: 80
    });
  }

  _workPromise(data) {
    return dp.proc(data, {
               table: data.table,
               useBatch: false
             });
  }
}

module.exports = DynamoWorkStream;

promised-lifestream

https://github.com/tilfin/promised-lifestream

AWSに限らずですが、複数の stream を pipe で繋げていくと、いずれかの stream で起きたエラーを補足するためには、それぞれの stream に .on('error', <function>) を定義する必要があります。またこれを Promise 化してすっきりさせたい。そのための関数を提供するモジュールです。needResult: true でオプション指定すると .then(result => ...) と最後の結果を受け取ることも可能です。

const PromisedLifestream = require('promised-lifestream');

exports.handle = function(event, context, callback) {
  const workStream = new DynamoWorkStream();
  PromisedLifestream([
    KSL.reader(event, { isAgg: false }),
    KSL.parseJSON({ expandArray: true }),
    workStream
  ])
  .then(() => {
    callback(null, null);
  })
  .catch(err => {
    callback(err); // 3つの stream どこかでエラーが起きてもここでキャッチできる
  });
}

補足

実装例と組み合わせるべきモジュール

promised-lifestream は全体的に使えます。

その他便利な Stream モジュール

tilfin
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away