2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Kinesis Stream, Lambda, DynamoDB, S3 で Stream ベース実装に使える npm モジュール

Last updated at Posted at 2017-05-15

AWS のマネージドサービスを連携する Lambda や サービスを Node.js の Stream を使って作ることが多いため、利用している自作モジュールについて説明します。

kinesis-stream-lambda

  • Kinesis Stream のイベントソースの Record をパースして data フィールドを Base64 から
    Buffer にしてくれる ReadableStream (KPL の aggregation にも対応可能)
  • その Buffer を JSON としてパースし、JavaScript Plain Object のデータに Transform するストリーム

それぞれを提供します。要するに Lambda の event をコンストラクタ渡して2つ pipe で繋げると中身の JSON が Object で Stream 処理できる代物です。

const es = require('event-stream');
const KSL = require('kinesis-stream-lambda');

exports.handler = function(event, context, callback) {
  console.log('event: ', JSON.stringify(event, null, 2));

  const result = [];
  const stream = KSL.reader(event, { isAgg: false });

  stream.on('end', function() {
    console.dir(result);
    callback(null, null);
  });

  stream.on('error', function(err) {
    callback(err);
  });

  stream
  .pipe(KSL.parseJSON({ expandArray: false }))
  .pipe(es.map(function(data, callback) {
    result.push(data);
    callback(null, data)
  }));
}

s3-block-read-stream

S3 からファイルの中身を Range で取得しつつ Stream で処理できます。通常の Stream だと後方のストリームが流量が少ない場合に S3 の HTTP レスポンス処理が長時間になって TCP 接続が双方で Write/ReadTimeout しまう懸念があります。但しブロックサイズが細かいと API のコール回数が増えるのでそこは注意が必要です(APIのコール回数は従量課金対象です)。

paced-work-stream

指定した並列数かつ一定間隔で処理できるワーカーのような Transform ベースの Stream を提供します。
主に DynamoDB への IO 処理や毎秒の呼び出し回数に制限がある API を呼び出すときに使えます。
fast-paced work にするのか slow-paced work にするかはコンストラクタの第1引数の concurrencyworkMS で調節します。

実際の処理はコンストラクタの第2引数にもしくは、サブクラスに _workPromise メソッド を定義します。この関数は処理内容を Promise で定義してその Promise を返す Function にします(これは Promise は定義した瞬間から実行が始まるためです)。なお Array<Function> として返すことも可能で、その場合でもそれを解釈して同時実行数を調節します。あと、処理数を tag でカウントできる機能も付いています。

dynamo-processor

DynamoDB への操作を JSON で定義して食わせると CRUD 処理が簡単に実行できるプロセッサモジュールです。
paced-work-stream と組ませて RCU/WCU を意識した汎用的な処理を実装できます。

class DynamoWorkStream extends PacedWorkStream {
  constructor() {
    super({
      concurrency: 10,
      workMS: 80
    });
  }

  _workPromise(data) {
    return dp.proc(data, {
               table: data.table,
               useBatch: false
             });
  }
}

module.exports = DynamoWorkStream;

promised-lifestream

AWSに限らずですが、複数の stream を pipe で繋げていくと、いずれかの stream で起きたエラーを補足するためには、それぞれの stream に .on('error', <function>) を定義する必要があります。またこれを Promise 化してすっきりさせたい。そのための関数を提供するモジュールです。needResult: true でオプション指定すると .then(result => ...) と最後の結果を受け取ることも可能です。

const PromisedLifestream = require('promised-lifestream');

exports.handle = function(event, context, callback) {
  const workStream = new DynamoWorkStream();
  PromisedLifestream([
    KSL.reader(event, { isAgg: false }),
    KSL.parseJSON({ expandArray: true }),
    workStream
  ])
  .then(() => {
    callback(null, null);
  })
  .catch(err => {
    callback(err); // 3つの stream どこかでエラーが起きてもここでキャッチできる
  });
}

補足

実装例と組み合わせるべきモジュール

promised-lifestream は全体的に使えます。

その他便利な Stream モジュール

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?