DynamoDBテーブルのデータをざーっと ReadableStream
に流し込みたかったが、どうやってバックプレッシャーに対応すればいいのか分からなかったので対応させてみた。
やりたいこと
- DynamoDBからデータを読み込む(基本は内部バッファサイズ分)
-
ReadableStream
にpush - ひたすら読み込んではpush
- 詰まったら一時停止
- 次回開始位置をpushしきれなかったアイテムに設定
- またpushできるようになったら続きから読み込み再開
- 以下繰り返し
- 全て読み込んでpushし終わったら処理終了
DynamoDBのQuery/ScanとLimit
DynamoDBのQuery/Scan操作では Limit
を指定することで取得件数を制限することができる。
ただ Limit
は検索結果の件数制限でなので、QueryFilter
や ScanFilter
を掛けると検索結果がさらにフィルタリングされ指定した件数より少ない件数が返ってくる場合がある。
クエリーで検索し、指定件数分の結果が揃ったら、クエリーフィルターでさらに絞る、というのはDynamoDBに限らずよくあるのでおそらく同じ動作なんだと思う。
まぁこれは Limit
を指定してもフィルターを指定すると思ったとおりの件数は返ってこないですよ、という注意事項。
DynamoDBのページネーション
DynamoDBのQuery/Scanは読み込みサイズが1MBを超えるとそこで帰ってくる。Limit
を指定した場合も途中で帰ってくる。
まだ全てのデータを読み込み終えてない場合、次ページを取得するための開始地点がレスポンスに含まれる。
レスポンスに含まれる LastEvaluatedKey
を次回Query/Scan時のリクエストに ExclusiveStartKey
として含めれば次ページを取得できる仕組みになっている。
LastEvaluatedKey
は実際にはプライマリーキー(パーティションキーあるいはパーティションキーとソートキーの組み合わせ)のオブジェクトである。
なので LastEvaluatedKey
でなく、レスポンスに含まれるアイテムの何番目かをぶち込めば次回の読み込み開始位置はその項目の次からになる。なった。
これを利用して、ReadableStream
にpushしている途中で ReadableStream
の内部バッファが一杯になった場合は、入り切らなかった次のアイテムを読み込み開始位置として ExclusiveStartKey
に設定することにした。
ReadableStreamのハイウォーターマーク
ReadableStream
は内部バッファを持っていて、そのサイズ=閾値をハイウォーターマーク(HWM)という。
オブジェクトモードの場合はデフォルトで 16
に設定されている。
データを push
し続けてHWMに達すると、
-
push
がfalse
を返す -
_read
が呼ばれなくなる
それで内部バッファに空きがでると再び _read
が呼ばれるらしい。
また push
が false
を返しても内部バッファには詰め込まれてた気がする。たしか。HWMが上がるんだったかな・・・
ということで、Query/Scanで取得したデータをpushしてる最中に false
を返した場合はそこで処理を中断し、次のアイテム以降は残念ながら次回のQuery/Scan時に再取得することにした。
諸々踏まえて
最終的にこんな感じの実装になった。
切り貼りしてたのでこのままでは動かないかも。雰囲気を感じ取って欲しい。
class DynamoDBStream extends stream.Readable {
:
_read(size) {
// 何度も呼ばれるので
if (!this.#querying) {
this.#querying = true;
this.#params.Limit = size || this.hwm;
this._query(this.#params);
}
}
// HWMに達するまで再帰的にQueryを実行する
_query(params) {
const recursiveQuery = (params) => {
// AWS.DynamoDB.DocumentClient.query()
this.#ddb.query(params, (err, data) => {
:
let abort = false;
for (const item of queryResult.Items) {
// HWMに達したらそのitemまで処理済みとし次回の開始位置を指定して中断
if (!this.push(item)) {
params.ExclusiveStartKey = item;
abort = true;
break;
}
}
// 中断したら次の読み込み要求を待つ
if (abort) {
this.#querying = false;
// 次ページがなかったら処理終了
} else if (typeof queryResult.LastEvaluatedKey === "undefined") {
this.push(null);
this.#querying = false;
// 中断せず、次ページがあれば続けて次ページを取得
} else {
params.ExclusiveStartKey = queryResult.LastEvaluatedKey;
recursiveQuery(params);
}
});
}
recursiveQuery(params);
}
:
}
余談: DynamoDBのコスト
リクエスト単位で課金される。また、結果整合性のある読み込みが一番安い。
Limit
を小さく指定しすぎるとリクエスト数が増えてしまい、ちゃりんちゃりん課金されてしまう。
かと言って、あんまり Limit
を大きくしてしまうと、詰め込みきれなかったデータが無駄になってしまうのでうまく調整しないといけない。
頻繁にリクエストが細分化してしまう場合は、いっそのことStreamとは別にバッファを持ったほうがいいかもしれない。