概要
RxJSの最も強力な機能のひとつが「オペレータ」である。
オペレータを使えば、非同期・イベント・状態のストリームを、構造的に合成・変換・制御できる。
だが、その多さに圧倒され、混乱するエンジニアも少なくない。
本稿では特に基本かつ強力な map
/ mergeMap
/ switchMap
を中心に、構造レベルでの理解を目指す。
1. オペレータとは何か?
オペレータ = Observableに対する関数的変換器
RxJSにおける .pipe()
に渡されるのが「オペレータ」:
source$.pipe(operatorA(), operatorB(), operatorC())
- ストリームを連結・変換・フィルタする
- イメージ:非同期バージョンの配列メソッド(map / filter / reduce など)
2. map:値の変換(syncな変形)
import { of } from 'rxjs'
import { map } from 'rxjs/operators'
of(1, 2, 3)
.pipe(map((x) => x * 10))
.subscribe(console.log) // → 10, 20, 30
- 直列的な変換
- 同期的なストリームの変形
3. mergeMap:非同期を合成する(flatMapと同義)
import { of, delay } from 'rxjs'
import { mergeMap } from 'rxjs/operators'
of('A', 'B')
.pipe(
mergeMap((char) => of(`${char}-done`).pipe(delay(1000)))
)
.subscribe(console.log)
- 入力ごとに新しいObservableを生成し、それらを並列にマージ
- 非同期処理(API、IO、タスク)との組み合わせで力を発揮
✅ mergeMapの特徴:
- 同時に複数の非同期処理が走る
- 結果は順不同
4. switchMap:直前のストリームをキャンセルして切り替える
import { fromEvent, debounceTime, switchMap, of } from 'rxjs'
fromEvent(inputEl, 'input')
.pipe(
debounceTime(300),
switchMap((e) => fetchResults(e.target.value))
)
.subscribe(renderResults)
- 入力が連続した場合、前のリクエストを自動キャンセルして、最新だけを処理
- UIや検索APIなど、**「最新状態のみを扱いたい」**場面に最適
5. 比較:mergeMap vs switchMap
特性 | mergeMap | switchMap |
---|---|---|
並行処理 | 全部走る(キャンセルしない) | 前の処理は自動キャンセルされる |
完了の順番 | 保証されない | 最新のものだけ実行される |
用途 | 並列処理(全結果を扱いたい時) | 入力イベント・最新状態のみのUI処理など |
6. 実務での活用パターン
✅ フォーム検索UI
searchInput$
.pipe(
debounceTime(300),
switchMap(keyword => fetchResults(keyword))
)
→ リクエストの競合防止、UI負荷の軽減に必須
✅ 並列にデータロード
from(userIds)
.pipe(mergeMap(id => fetchUserById(id)))
→ 並列に全ユーザデータを取得。mergeMapが有効
よくある誤解と対策
❌ mergeMap = map + flatten でしょ?
→ ✅ 単なるフラット化ではなく、同時実行の制御やキャンセル不可性も含む
❌ switchMapはどれか1つしか走らない
→ ✅ 最新の1つだけが走る。直前の処理は明示せずキャンセルされる(Promiseでは不可能)
❌ map, mergeMap, switchMap の違いがあいまい
→ ✅ 重要なのは「値 → Observableにするか?」「複数処理を同時に走らせたいか?」の2点
結語
Rxのオペレータは「文法」ではない。
それは非同期・ストリーム・イベントを合成し、制御し、最適化する構造的パーツ群である。
-
map
は単純変換 -
mergeMap
は非同期の並列合成 -
switchMap
は状態を1つに絞る制御構造
リアクティブ設計におけるオペレータとは、
“非同期ストリームに対する関数型アーキテクチャであり、制御不能な動的処理を、構造として解きほぐすための抽象である。”