こんにちは。日々 RxJS と格闘している @ringtail003 です。
RxJSには 公式ドキュメント や Learn RxJS など学習コンテンツがあるのですが、それを読んでもなおマーブルダイアグラムが理解できない私です。なぜかって私が開発しているプロジェクトでは、クリックイベントを題材にしたコードや、数秒おきに 1 2 3...
を流すコードの使い所がなく、それらを参考にしても実践で扱いたい題材との乖離が激しくアンポンタンな頭でうまく転換する事ができないのです。
私が欲しいのは HTTP リクエスト! HTTP リクエストのサンプルコード!!という事で、配列を並列・逐次・一括の HTTP リクエストにするサンプルコードを書いてみる事にしました。
この記事を検証した環境
- rxjs v6.5.3
- typescript v3.6.4
はじめに
擬似的な HTTP リクエストとして、このようなサービスを作りました。
import { of } from 'rxjs';
import { delay } from 'rxjs/operators';
export function httpService(id: string) {
const response = [
{ id: 'a', delay: 3000 },
{ id: 'b', delay: 4000 },
{ id: 'c', delay: 1000 },
].find((item) => item.id === id);
return of(response)
.pipe(delay(response.delay))
;
}
httpService()
関数を呼び出すと、数秒遅延して擬似的なレスポンスを返します。
httpService('a'); // レスポンス "{ id: 'a', delay: 1000 }"
a
b
c
を一斉に呼び出すと c > a > b
の順でストリームに値が流れます。
httpService('a'); // 3 秒後
httpService('b'); // 4 秒後
httpService('c'); // 1 秒後
並列処理
一斉に POST をリクエストしてレスポンスが返ってきたものから順に処理したい、みたいなケースです。
import { from } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
from(['a', 'b', 'c']).pipe(
mergeMap((id) => httpService(id)),
)
.subscribe((value) => {...}})
;
from はイテレーション可能な値をストリームに変換するオペレータです。 from(['a', 'b', 'c'])
はストリームを作成し a > b > c
の順に値を3回に分けて流します。
流れてきた値を httpService()
関数に渡すと HTTP レスポンスが Observable
型で返却されます。ストリームの中にストリームが流れている状態ですね。
---Observable<response c>---Observable<response a>---Observable<response b>---
mergeMap
を使うと内側のストリームの値を外側のストリームに放出する事ができます。
---<response c>---<response a>---<response b>---
結果
レスポンスはこの順で subscribe
に到着します。
c (delay 1000) // 1秒後
a (delay 3000) // 3秒後
b (delay 4000) // 4秒後
並列処理(最初に到達したものだけ欲しい)
フォールバックを含めた複数サーバーに一斉に GET をリクエストして最初に返ってきたレスポンスを使う、みたいなケースです。
Promise.race()
の RxJS 版ですね。
import { from } from 'rxjs';
import { switchMap } from 'rxjs/operators';
from(['a', 'b', 'c']).pipe(
switchMap((id) => httpService(id)),
)
.subscribe((value) => {...}})
;
switchMap
を使うと subscribe
に到着する値は最初のひとつだけで、以降の値は無視する事になります。
結果
レスポンスはひとつだけ subscribe
に到着します。
c (delay 1000) // 1秒後
逐次処理
複数の DELETE リクエストを順番に処理したい、みたいなケースです。
import { from } from 'rxjs';
import { concatMap } from 'rxjs/operators';
from(['a', 'b', 'c']).pipe(
concatMap((id) => httpService(id)),
)
.subscribe((value) => {...}})
;
concatMap
は前の処理が終わるのを待ってから次の処理を実行します。
結果
レスポンスはこの順で subscribe
に到着します。
a (delay 3000) // 3秒後
b (delay 4000) // 3 + 4秒後
c (delay 1000) // 3 + 4 + 1秒後
一括処理
複数の HTTP リクエストが全て完了するのを待って処理するケースです。
Promise.all()
の RxJS 版ですね。
import { forkJoin } from 'rxjs';
const sources = ['a', 'b', 'c'].map((id) => httpService(id));
forkJoin(sources)
.subscribe((values) => {
// forkJoin に渡した配列と同じ順番で values が返る
values[0]; // a
values[1]; // b
values[2]; // c
})
;
forkJoin
は流れてきたストリームの全ての完了を待ち、それぞれのストリームで最後に流れた値を放出します。
結果
subscribe
が呼ばれるのは 1 回です。全ての値が一括して到着します。
// 4秒後
[
a (delay 3000),
b (delay 4000),
c (delay 1000),
]
エラーハンドリング
HTTP 通信ではエラーハンドリングが欠かせません。私達はエラーの内容を解析して、システムのユーザーになぜエラーが起きたのかを伝え、その後取るべき操作に誘導する必要があります。
HTTP リクエストのエラーに似た状況を作るために httpService()
関数を修正してみましょう。
import { Observable, of, from, timer, throwError } from 'rxjs';
import { delay, mergeMap } from 'rxjs/operators';
export function httpService(id: string) {
// httpService('error') のように呼ばれたら 2 秒後にエラーを発生させる
if (id === 'error') {
const error = { id: 'error', delay: 2000 };
return timer(error.delay).pipe(
mergeMap(() => throwError(error)),
);
}
...
}
a
b
c
error
を一斉に呼び出すと c > error > a > b
の順でストリームに値が流れます。
httpService('a'); // 3 秒後
httpService('b'); // 4 秒後
httpService('c'); // 1 秒後
httpService('error'); // 2 秒後
これで準備ができました。
並列処理 で登場した mergeMap
のサンプルコードにエラーハンドリングを追加してみましょう。
エラーハンドリング(無視)
複数の HTTP リクエストのうち、どれかでエラーが発生した場合に無視して次を実行するケースです。
import { from, EMPTY } from 'rxjs';
import { mergeMap, tap, catchError } from 'rxjs/operators';
from(['a', 'b', 'c', 'error']).pipe(
mergeMap((id) => httpService(id).pipe(
catchError(() => EMPTY),
)),
)
.subscribe((value) => {...}})
;
catchError
でストリームにエラーハンドリングを追加する事ができます。この関数の返却値がエラーが発生した時の扱いを決定します。
from
で作成したストリームにはこのように値が流れています。
---Observable<c>---Observable<error>---Observable<a>---Observable<b>---
catchError
でエラーがキャッチされます。EMPTY
はストリームをただちに完了状態にし、それ以上値を流しません。
---Observable<c>---(completed)---Observable<a>---Observable<b>---
error
のストリームは購読できる状態ではなくなったため subscribe
には到着しません。
---Observable<c>---Observable<a>---Observable<b>---
結果
subscribe
には c
a
b
の順で到着します。
c (delay 1000)
a (delay 3000)
b (delay 4000)
参考
実際のプロダクションコードで「エラーを完全に無視する = 握りつぶす」という実装はあんまりしないですよね。画面にアラートを表示しつつその後の処理を続行する、くらいが妥当だと思われます。
catchError
のコールバック関数はエラーの詳細を得る事ができます。そしてこれらを使ってシステムに副作用を持たせる事ができます。
from(['a', 'b', 'c', 'error']).pipe(
tap((id) => log(`requested ${id}`)),
mergeMap((id) => httpService(id).pipe(
catchError((err, caught) => {
console.log(err); // { id: 'error', delay: 2000 }
console.log(caught); // Observable{...}
// 実際の HTTP リクエストではステータスコードなどが拾える
notifyService(err.id);
return EMPTY;
}),
)),
)
エラーハンドリング(続行)
a
b
c
error
の HTTP リクエストのうち、成功・エラーに関わらず全て購読するケースです。
import { from, of } from 'rxjs';
import { mergeMap, tap, catchError } from 'rxjs/operators';
from(['a', 'b', 'c', 'error']).pipe(
mergeMap((id) => httpService(id).pipe(
catchError((err) => of(err)),
)),
)
.subscribe((value) => {...}})
;
結果
subscribe
には c
error
a
b
の順で到着します。
c (delay 1000)
error (delay 2000)
a (delay 3000)
b (delay 4000)
エラーハンドリング(中断)
a
b
c
error
の HTTP リクエストのうち、どれかひとつでもエラーになったら購読を中断するケースです。
import { from, EMPTY, throwError } from 'rxjs';
import { mergeMap, tap, catchError } from 'rxjs/operators';
import { httpService } from './http-service';
import { log } from './dom';
from(['a', 'b', 'c', 'error']).pipe(
mergeMap((id) => httpService(id)),
catchError((err) => EMPTY),
)
.subscribe((value) => {}})
;
さきほど登場した エラーハンドリング(無視) のサンプルコードとは catchError
を指定するストリームが異なっているのでご注意ください。
from
で作成した外側のストリームにはこのように値が流れています。EMPTY
を流す事により完了状態になるため、それ以上値が流れる事はありません。
---Observable<c>--- | (completed)
結果
subscribe
には c
のみ到着します。
c (delay 1000)
おわり
以上です。RxJS はオペレータが非常に多いのとデバッグが困難で敷居が高く感じてしまいますが、知らなかったオペレータをあれこれ見つけ出して試すほど深みにはまっていく気がします。よく使う基本の処理さえ抑えておけば、それほど迷う事はないのかもしれませんね。