12
4

More than 3 years have passed since last update.

RxJSでHTTPリクエストの並列・逐次・一括処理をする

Posted at

こんにちは。日々 RxJS と格闘している @ringtail003 です。

RxJSには 公式ドキュメントLearn RxJS など学習コンテンツがあるのですが、それを読んでもなおマーブルダイアグラムが理解できない私です。なぜかって私が開発しているプロジェクトでは、クリックイベントを題材にしたコードや、数秒おきに 1 2 3... を流すコードの使い所がなく、それらを参考にしても実践で扱いたい題材との乖離が激しくアンポンタンな頭でうまく転換する事ができないのです。

私が欲しいのは HTTP リクエスト! HTTP リクエストのサンプルコード!!という事で、配列を並列・逐次・一括の HTTP リクエストにするサンプルコードを書いてみる事にしました。

この記事を検証した環境

はじめに

擬似的な HTTP リクエストとして、このようなサービスを作りました。

import { of } from 'rxjs';
import { delay } from 'rxjs/operators';

export function httpService(id: string) {
  const response = [
    { id: 'a', delay: 3000 },
    { id: 'b', delay: 4000 },
    { id: 'c', delay: 1000 },
  ].find((item) => item.id === id);

  return of(response)
    .pipe(delay(response.delay))
  ;
}

httpService() 関数を呼び出すと、数秒遅延して擬似的なレスポンスを返します。

httpService('a'); // レスポンス "{ id: 'a', delay: 1000 }"

a b c を一斉に呼び出すと c > a > b の順でストリームに値が流れます。

httpService('a'); // 3 秒後
httpService('b'); // 4 秒後
httpService('c'); // 1 秒後

並列処理

一斉に POST をリクエストしてレスポンスが返ってきたものから順に処理したい、みたいなケースです。

import { from } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

from(['a', 'b', 'c']).pipe(
    mergeMap((id) => httpService(id)),
  )
  .subscribe((value) => {...}})
;

from はイテレーション可能な値をストリームに変換するオペレータです。 from(['a', 'b', 'c']) はストリームを作成し a > b > c の順に値を3回に分けて流します。

流れてきた値を httpService() 関数に渡すと HTTP レスポンスが Observable 型で返却されます。ストリームの中にストリームが流れている状態ですね。

---Observable<response c>---Observable<response a>---Observable<response b>---

mergeMap を使うと内側のストリームの値を外側のストリームに放出する事ができます。

---<response c>---<response a>---<response b>---

結果

レスポンスはこの順で subscribe に到着します。

c (delay 1000) // 1秒後

a (delay 3000) // 3秒後

b (delay 4000) // 4秒後

並列処理(最初に到達したものだけ欲しい)

フォールバックを含めた複数サーバーに一斉に GET をリクエストして最初に返ってきたレスポンスを使う、みたいなケースです。
Promise.race() の RxJS 版ですね。

import { from } from 'rxjs';
import { switchMap } from 'rxjs/operators';

from(['a', 'b', 'c']).pipe(
    switchMap((id) => httpService(id)),
  )
  .subscribe((value) => {...}})
;

switchMap を使うと subscribe に到着する値は最初のひとつだけで、以降の値は無視する事になります。

結果

レスポンスはひとつだけ subscribe に到着します。

c (delay 1000) // 1秒後

逐次処理

複数の DELETE リクエストを順番に処理したい、みたいなケースです。

import { from } from 'rxjs';
import { concatMap } from 'rxjs/operators';

from(['a', 'b', 'c']).pipe(
    concatMap((id) => httpService(id)),
  )
  .subscribe((value) => {...}})
;

concatMap は前の処理が終わるのを待ってから次の処理を実行します。

結果

レスポンスはこの順で subscribe に到着します。

a (delay 3000) // 3秒後

b (delay 4000) // 3 + 4秒後

c (delay 1000) // 3 + 4 + 1秒後

一括処理

複数の HTTP リクエストが全て完了するのを待って処理するケースです。
Promise.all()の RxJS 版ですね。

import { forkJoin } from 'rxjs';

const sources = ['a', 'b', 'c'].map((id) => httpService(id));

forkJoin(sources)
  .subscribe((values) => {
    // forkJoin に渡した配列と同じ順番で values が返る
    values[0]; // a
    values[1]; // b
    values[2]; // c
  })
;

forkJoin は流れてきたストリームの全ての完了を待ち、それぞれのストリームで最後に流れた値を放出します。

結果

subscribe が呼ばれるのは 1 回です。全ての値が一括して到着します。

 // 4秒後
[
  a (delay 3000),
  b (delay 4000),
  c (delay 1000),
]

エラーハンドリング

HTTP 通信ではエラーハンドリングが欠かせません。私達はエラーの内容を解析して、システムのユーザーになぜエラーが起きたのかを伝え、その後取るべき操作に誘導する必要があります。

HTTP リクエストのエラーに似た状況を作るために httpService() 関数を修正してみましょう。

import { Observable, of, from, timer, throwError } from 'rxjs';
import { delay, mergeMap } from 'rxjs/operators';

export function httpService(id: string) {
  // httpService('error') のように呼ばれたら 2 秒後にエラーを発生させる
  if (id === 'error') {
    const error = { id: 'error', delay: 2000 };

    return timer(error.delay).pipe(
      mergeMap(() => throwError(error)),
    );
  }

  ...
}

a b c error を一斉に呼び出すと c > error > a > b の順でストリームに値が流れます。

httpService('a'); // 3 秒後
httpService('b'); // 4 秒後
httpService('c'); // 1 秒後
httpService('error'); // 2 秒後

これで準備ができました。
並列処理 で登場した mergeMap のサンプルコードにエラーハンドリングを追加してみましょう。

エラーハンドリング(無視)

複数の HTTP リクエストのうち、どれかでエラーが発生した場合に無視して次を実行するケースです。

import { from, EMPTY } from 'rxjs';
import { mergeMap, tap, catchError } from 'rxjs/operators';

from(['a', 'b', 'c', 'error']).pipe(
    mergeMap((id) => httpService(id).pipe(
      catchError(() => EMPTY),
    )),
  )
  .subscribe((value) => {...}})
;

catchError でストリームにエラーハンドリングを追加する事ができます。この関数の返却値がエラーが発生した時の扱いを決定します。

from で作成したストリームにはこのように値が流れています。

---Observable<c>---Observable<error>---Observable<a>---Observable<b>---

catchError でエラーがキャッチされます。EMPTY はストリームをただちに完了状態にし、それ以上値を流しません。

---Observable<c>---(completed)---Observable<a>---Observable<b>---

error のストリームは購読できる状態ではなくなったため subscribe には到着しません。

---Observable<c>---Observable<a>---Observable<b>---

結果

subscribe には c a b の順で到着します。

c (delay 1000)

a (delay 3000)

b (delay 4000)

参考

実際のプロダクションコードで「エラーを完全に無視する = 握りつぶす」という実装はあんまりしないですよね。画面にアラートを表示しつつその後の処理を続行する、くらいが妥当だと思われます。

catchError のコールバック関数はエラーの詳細を得る事ができます。そしてこれらを使ってシステムに副作用を持たせる事ができます。

from(['a', 'b', 'c', 'error']).pipe(
    tap((id) => log(`requested ${id}`)),
    mergeMap((id) => httpService(id).pipe(
      catchError((err, caught) => {
        console.log(err); // { id: 'error', delay: 2000 }
        console.log(caught); // Observable{...}

        // 実際の HTTP リクエストではステータスコードなどが拾える

        notifyService(err.id);

        return EMPTY;
      }),
    )),
  )

エラーハンドリング(続行)

a b c error の HTTP リクエストのうち、成功・エラーに関わらず全て購読するケースです。

import { from, of } from 'rxjs';
import { mergeMap, tap, catchError } from 'rxjs/operators';

from(['a', 'b', 'c', 'error']).pipe(
    mergeMap((id) => httpService(id).pipe(
      catchError((err) => of(err)),
    )),
  )
  .subscribe((value) => {...}})
;

結果

subscribe には c error a b の順で到着します。

c (delay 1000)

error (delay 2000)

a (delay 3000)

b (delay 4000)

エラーハンドリング(中断)

a b c error の HTTP リクエストのうち、どれかひとつでもエラーになったら購読を中断するケースです。

import { from, EMPTY, throwError } from 'rxjs';
import { mergeMap, tap, catchError } from 'rxjs/operators';
import { httpService } from './http-service';
import { log } from './dom';

from(['a', 'b', 'c', 'error']).pipe(
    mergeMap((id) => httpService(id)),
    catchError((err) => EMPTY),
  )
  .subscribe((value) => {}})
;

さきほど登場した エラーハンドリング(無視) のサンプルコードとは catchError を指定するストリームが異なっているのでご注意ください。

from で作成した外側のストリームにはこのように値が流れています。EMPTY を流す事により完了状態になるため、それ以上値が流れる事はありません。

---Observable<c>--- | (completed)

結果

subscribe には c のみ到着します。

c (delay 1000)

おわり

以上です。RxJS はオペレータが非常に多いのとデバッグが困難で敷居が高く感じてしまいますが、知らなかったオペレータをあれこれ見つけ出して試すほど深みにはまっていく気がします。よく使う基本の処理さえ抑えておけば、それほど迷う事はないのかもしれませんね。

12
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
4