0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

DynamoDBのデータをnode.jsのReadableStreamに流す

Last updated at Posted at 2020-07-31

DynamoDBテーブルのデータをざーっと ReadableStream に流し込みたかったが、どうやってバックプレッシャーに対応すればいいのか分からなかったので対応させてみた。

やりたいこと

  • DynamoDBからデータを読み込む(基本は内部バッファサイズ分)
  • ReadableStream にpush
  • ひたすら読み込んではpush
  • 詰まったら一時停止
  • 次回開始位置をpushしきれなかったアイテムに設定
  • またpushできるようになったら続きから読み込み再開
  • 以下繰り返し
  • 全て読み込んでpushし終わったら処理終了

DynamoDBのQuery/ScanとLimit

DynamoDBのQuery/Scan操作では Limit を指定することで取得件数を制限することができる。
ただ Limit は検索結果の件数制限でなので、QueryFilterScanFilter を掛けると検索結果がさらにフィルタリングされ指定した件数より少ない件数が返ってくる場合がある。

クエリーで検索し、指定件数分の結果が揃ったら、クエリーフィルターでさらに絞る、というのはDynamoDBに限らずよくあるのでおそらく同じ動作なんだと思う。

まぁこれは Limit を指定してもフィルターを指定すると思ったとおりの件数は返ってこないですよ、という注意事項。

DynamoDBのページネーション

DynamoDBのQuery/Scanは読み込みサイズが1MBを超えるとそこで帰ってくる。Limit を指定した場合も途中で帰ってくる。
まだ全てのデータを読み込み終えてない場合、次ページを取得するための開始地点がレスポンスに含まれる。
レスポンスに含まれる LastEvaluatedKey を次回Query/Scan時のリクエストに ExclusiveStartKey として含めれば次ページを取得できる仕組みになっている。

LastEvaluatedKey は実際にはプライマリーキー(パーティションキーあるいはパーティションキーとソートキーの組み合わせ)のオブジェクトである。
なので LastEvaluatedKey でなく、レスポンスに含まれるアイテムの何番目かをぶち込めば次回の読み込み開始位置はその項目の次からになる。なった。

これを利用して、ReadableStream にpushしている途中で ReadableStream の内部バッファが一杯になった場合は、入り切らなかった次のアイテムを読み込み開始位置として ExclusiveStartKey に設定することにした。

ReadableStreamのハイウォーターマーク

ReadableStream は内部バッファを持っていて、そのサイズ=閾値をハイウォーターマーク(HWM)という。
オブジェクトモードの場合はデフォルトで 16 に設定されている。
データを push し続けてHWMに達すると、

  • pushfalse を返す
  • _read が呼ばれなくなる

それで内部バッファに空きがでると再び _read が呼ばれるらしい。
また pushfalse を返しても内部バッファには詰め込まれてた気がする。たしか。HWMが上がるんだったかな・・・

ということで、Query/Scanで取得したデータをpushしてる最中に false を返した場合はそこで処理を中断し、次のアイテム以降は残念ながら次回のQuery/Scan時に再取得することにした。

諸々踏まえて

最終的にこんな感じの実装になった。
切り貼りしてたのでこのままでは動かないかも。雰囲気を感じ取って欲しい。

class DynamoDBStream extends stream.Readable {
    :
  _read(size) {
    // 何度も呼ばれるので
    if (!this.#querying) {
      this.#querying = true;
      this.#params.Limit = size || this.hwm;
      this._query(this.#params);
    }
  }
  // HWMに達するまで再帰的にQueryを実行する
  _query(params) {
    const recursiveQuery = (params) => {
      // AWS.DynamoDB.DocumentClient.query()
      this.#ddb.query(params, (err, data) => {
          :
        let abort = false;
        for (const item of queryResult.Items) {
          // HWMに達したらそのitemまで処理済みとし次回の開始位置を指定して中断
          if (!this.push(item)) {
            params.ExclusiveStartKey = item;
            abort = true;
            break;
          }
        }
        // 中断したら次の読み込み要求を待つ
        if (abort) {
          this.#querying = false;
        // 次ページがなかったら処理終了
        } else if (typeof queryResult.LastEvaluatedKey === "undefined") {
          this.push(null);
          this.#querying = false;
        // 中断せず、次ページがあれば続けて次ページを取得
        } else {
          params.ExclusiveStartKey = queryResult.LastEvaluatedKey;
          recursiveQuery(params);
        }
      });
    }
    recursiveQuery(params);
  }
    :
}

余談: DynamoDBのコスト

リクエスト単位で課金される。また、結果整合性のある読み込みが一番安い。
Limit を小さく指定しすぎるとリクエスト数が増えてしまい、ちゃりんちゃりん課金されてしまう。

かと言って、あんまり Limit を大きくしてしまうと、詰め込みきれなかったデータが無駄になってしまうのでうまく調整しないといけない。

頻繁にリクエストが細分化してしまう場合は、いっそのことStreamとは別にバッファを持ったほうがいいかもしれない。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?