この記事は 俺でもわかるシリーズ Advent Calendar 2018 の14日目です。本当に申し訳ありません。
前置き
たとえば、全件をまとめて取得するのが殆ど不可能であるがゆえに、1回の呼び出しで最大1,000件のアイテム群が返ってくる API があったとしよう。
次のページを取得するためには、現在のページのレスポンスに含まれる情報(カーソル)を使って再度リクエストをするという仕組みであり、
そして、あなたはその最大1,000件ずつ取得するアイテム群に対して、何らかの処理をしたい。そういう状況であったとすると、
何らかの処理というのは非常に多岐に渡っていて、ざっくり考えただけでも
- 何らかの条件に応じてアイテムを取得したい
- 全件を取得したい
- 1件だけ取得したい
- n件まで取得したい
- 合致するまでの全件を取得したい
- 取得されたアイテムに何らかの処理をしたい
- データ形式を変換したい
- n件ずつのサブグループにしたい
- 合計とか平均とか、それらを使って1つのデータを作りたい
- 取得元は複数ある(API を叩く条件が複数とか)が、取得されたアイテムに対する処理は共通化したい
- アイテムが取得されたら、探索が終わってからでなく取得された瞬間から次の処理に渡したい
これを地道に実装しようとすると、次ページを取得する部分とアイテムに対して処理をする部分が混ざり、非常に見通しが悪そう。
しかし、コレ、なんか見覚えがあるよな……
というわけで RxJS を使って、例として S3 の ListObjectsV2 API をラップしてみるというネタです。
前提条件・ソースコード
Observable
を静的型なしに扱うと脳が爆発四散するので TypeScript で書きました。
"aws-sdk": "^2.377.0"
"rxjs": "^6.3.3"
"typescript": "^3.2.2"
ソースは y13i/s3-observable-example に置いてます。
ListObjectsV2 を Observable 化する
こんな感じにしてみました。
import { Observable } from "rxjs";
import { S3 } from "aws-sdk";
type Request = S3.ListObjectsV2Request;
export function listObjectsAsObservable(
s3: S3,
request: Request
): Observable<S3.ListObjectsV2Output> {
return Observable.create((observer: any) => {
let finished = false;
let continuationToken: string | undefined = undefined;
(async () => {
try {
do {
const req: Request = {
...request,
ContinuationToken: continuationToken
};
const result = await s3.listObjectsV2(req).promise();
continuationToken = result.NextContinuationToken;
observer.next(result);
} while (continuationToken && !finished);
} catch (error) {
observer.error(error);
}
observer.complete();
})();
return () => (finished = true);
});
}
n 回繰り返し API 呼び出しのメソッドを呼び出してその結果を流す Observable
を作るには Observable.create()
を使えば良さそうです。
Observable.create()
の引数であるサブスクライバー関数の仮引数 observer
のメソッド next()
に何らかの値を引数として与えて実行すると、その値が流れてくるようになります。 error()
, complete()
はそれぞれ、ストリームのエラー値とストリーム終了時の値に対応しています。
s3.listObjectV2()
API の、次ページを指し示すカーソルは ContinuationToken
という名前です。レスポンスに含まれる NextContinuationToken
の値を次回のリクエストに ContinuationToken
として含めると、次ページの結果を得られます。
async
, await
を使うとループ回数が不明な場合の処理を再帰なしに書けるので脳にやさしいですね。
サブスクライバー関数は、ストリームが中断(dispose)された時の処理を行う関数を返します。例えば10個の値が流れてくるストリームに対し、 take(5)
すると5個取った時点で残りは流れてこなくてよい、というような場合です。今回の例ではそうなった場合は以降のページのリクエストを行わない、としています。
試してみる
1. まずは簡単な例
適当に130個ほどダミーオブジェクトを置いておき、それを50個ずつ一覧してみます。本来 listObjectV2()
は1回で1,000件まで取得できるのですが、時間短縮のためわざと件数を少なめで試しています。
await listObjectsAsObservable(s3, {
Bucket: bucket,
Prefix: prefix,
MaxKeys: 50
})
.pipe(
tap(listObjectsResult => {
console.log(
`!!! list contains ${listObjectsResult.Contents!.length} items.`
);
})
)
.toPromise();
AWS SDK のデバッグ出力を有効にして実行してみました。出力結果は以下です。
[AWS s3 200 0.096s 0 retries] listObjectsV2({ Bucket: 's3-observable-example',
Prefix: 'test1/',
MaxKeys: 50,
ContinuationToken: undefined })
!!! list contains 50 items.
[AWS s3 200 0.076s 0 retries] listObjectsV2({ Bucket: 's3-observable-example',
Prefix: 'test1/',
MaxKeys: 50,
ContinuationToken:
'1KI8JxWWj6YE3WmkKojZ2JeionQ3wtMNsZDGPXICX4e0wk0jhsiYZFh1EjUao850UqILlYHBZUVn45tLUh0bkcA==' })
!!! list contains 50 items.
[AWS s3 200 0.073s 0 retries] listObjectsV2({ Bucket: 's3-observable-example',
Prefix: 'test1/',
MaxKeys: 50,
ContinuationToken:
'1MQ1MDhm6mD4vblrhuvafELPq5mKZQA1V6gHy0wxjfjWB8nSP+DkTdEG5XHLZUjWCkqMBEDnDXyggNokdHMKRig==' })
!!! list contains 30 items.
listObjectsV2()
が計3回呼び出され、それぞれのリストの長さは50, 50, 30件になっていることがわかります。期待通りの挙動っぽい。
2. ストリームを変形する
listObjectsAsObservable()
は S3.ListObjectsV2Output
が流れてくるように実装しましたが、実際取りたいのはオブジェクトの情報だけなんじゃ! という事はままあるでしょう。
そんな時は Observable.from()
, mergeMap()
を使うと簡単にオブジェクト情報(型定義の名前では S3.Object
)のストリームに変換できます。
const divisibleBy13Count = await listObjectsAsObservable(s3, {
Bucket: bucket,
Prefix: prefix,
MaxKeys: 50
})
.pipe(
mergeMap(listObjectsResult => {
return from(listObjectsResult.Contents!);
}),
filter(object => {
const match = object.Key!.match(/(\d+)\.json$/);
if (!match) return false;
const number = parseInt(match[1]!);
return number % 13 === 0;
}),
tap(object => {
console.log(
`The timestamp in object key ${object.Key} is divisible by 13`
);
}),
count()
)
.toPromise();
console.log(
`There are ${divisibleBy13Count} objects with 13-divisible-timestamps.`
);
この例ではストリームを Observable<S3.Object>
に変換した後、オブジェクトキーに含まれるタイムスタンプが13の倍数のものだけを抽出・出力して、さらにその個数をカウントする、としてみました。出力結果はこんな感じ。
The timestamp in object key test1/113-1545384463792.json is divisible by 13
The timestamp in object key test1/15-1545384453717.json is divisible by 13
The timestamp in object key test1/90-1545384461985.json is divisible by 13
There are 3 objects with 13-divisible-timestamps.
3. 非同期処理を重ねて、その結果をまとめてみる
ダミーオブジェクトは {"value":47}
のような形式で100未満の数字が書いてある JSON です。
この数字を取得するためには listObjectsV2()
だけでなく getObject()
でオブジェクト内容を取得する必要があります。
最後にこれをやってみましょう。
await listObjectsAsObservable(s3, {
Bucket: bucket,
Prefix: prefix,
MaxKeys: 50
})
.pipe(
mergeMap(listObjectsResult => {
return from(listObjectsResult.Contents!);
}),
filter(object => {
const match = object.Key!.match(/(\d+)\.json$/);
if (!match) return false;
const number = parseInt(match[1]!);
return number % 7 === 0;
}),
take(7),
mergeMap(object => {
return s3.getObject({ Bucket: bucket, Key: object.Key! }).promise();
}),
map(getObjectResult => {
const parsed: { value: number } = JSON.parse(
getObjectResult.Body!.toString()
);
return parsed.value;
}),
scan((acc, x) => {
return acc + x;
}, 0),
tap(currentSum =>
console.log(
`The current sum of values in objects with 7-divisible-timestamps is ${currentSum}`
)
)
)
.toPromise();
別の非同期処理を挟み込む場合も mergeMap()
を使います。 Promise
を自動で Observable
に変換してくれてありがたい。
ついでにストリームっぽく、最初の7つの、7の倍数のタイムスタンプを持つオブジェクトの中の値(なげーな)が流れてくるたびに「その時点での合計値」を出力するようにしてみました。
The current sum of values in the first 7 objects with 7-divisible-timestamps is 51
The current sum of values in the first 7 objects with 7-divisible-timestamps is 94
The current sum of values in the first 7 objects with 7-divisible-timestamps is 179
The current sum of values in the first 7 objects with 7-divisible-timestamps is 257
The current sum of values in the first 7 objects with 7-divisible-timestamps is 285
The current sum of values in the first 7 objects with 7-divisible-timestamps is 315
The current sum of values in the first 7 objects with 7-divisible-timestamps is 379
まとめ
NextContinuationToken
が続く限り次ページをリクエストし続ける、という処理を listObjectsAsObservable()
の中に閉じ込めることができ、そうして取得してきたデータに対する処理と混ぜずに書けることがわかりました。
場合によっては役に立つかもしれないですね。
あと Observable/RxJS をもっとわかりたいと思いました。おわり