RxJSのconcatMap, mergeMap, switchMapの違いを理解する(中級者向け)

  • 53
    いいね
  • 0
    コメント

ちきさんです。趣味はRxの再実装です。

さてRxJSの数あるオペレーターの中でも3大謎オペとして知られるconcatMap, mergeMap, switchMapについてお勉強しましょう。
(これらのオペレーター以前の段階で躓いている方にはちょっと難しい内容かもしれません)

この中でもよく使われるのはmergeMap(flatMap)かなと思いますのでとりあえずはこれを中心に話を進めていきます。

mergeMapの極意1 ~新たなストリームを生み出す~

mergeMapの第一の特徴は、引数の値を使って新たなObservableを生成できるという点です。
例えば、

Observable.of(5)
  .subscribe(value => {
    console.log(value)
  })

これのoutputは

5

これだけです。誰でもわかりますね。

それではmergeMapを使うとどういうことができるのか見てみましょう。

Observable.of(5)
  .mergeMap(value => {
    return Observable.range(1, value).zip(Observable.interval(500))
  })
  .map(values => values[0])
  .subscribe(value => {
    console.log(value)
  })

これぐらいになると初見殺し感が出てきます。でも普通はこんな意味不明なコードは書かないので安心してください。

これのoutputは

1
(...500ms)
2
(...500ms)
3
(...500ms)
4
(...500ms)
5

となります。

同じ Observable.of(5) で始まるのに後者は時間軸が加わった上に1から5までの数値が生成されています。おそるべしmergeMap。

先程のコードで注目すべきは

  .mergeMap(value => {
    return Observable.range(1, value).zip(Observable.interval(500))
  })

この部分で、ここで value === 5 なのですがその次に何が起きているかというと、

  • Observable.range(1, value) によって [1,2,3,4,5] という配列が生成される。
  • Observable.interval(500) というストリームが新たに作られる。これは500ms毎に値を流す。
  • zip オペレーターにより [1,2,3,4,5] という配列と interval が合成され、500ms毎に配列の値を一つずつ次に流すストリームが作られる。

というわけです。

なるほど。

mergeMapの極意2 ~非同期をさりげなく解決させる~

concatMapとswitchMapにも言えることですが、実は非同期を解決させてから値を次に流してくれます。

Observable.of(5)
  .mergeMap(value => {
    return Promise.resolve(value)
  })
  .subscribe(value => {
    console.log(value)
  })

この書き方はアリです。

  .mergeMap(value => {
    return Observable.fromPromise(Promise.resolve(value))
  })

上記のように Observable.fromPromise とか書く必要はありません。

また次のようにPromiseではなくObservableも受けられます。

  .mergeMap(value => {
    return Observable.of(value)
  })

そしてこの非同期を解決させて次に流すという機能は便利な反面、concatMap, mergeMap, switchMapの挙動の違いを理解していないと上手に使いこなすことができません。

concatMap, mergeMap, switchMapの違い

3者の違いがわかるようなコードを書いてみました。

ちなみにデモはこちら → Demo on JS Bin
(うまく動かないときは左上の「ES6/Babel」のところを「TypeScript」に切り替えると動いたりします)

console.log('start')

const subject = new Subject()

subject
  .concatMap(obj => { // concatMap, mergeMap, switchMapで結果が変わる。
    return httpGet(obj.url, obj.delay)
  })
  .map(res => JSON.parse(res).ResultData)
  .subscribe(value => {
    console.log(value)
  })

subject.next({ url: 'http://foo', delay: 500 })
subject.next({ url: 'http://bar', delay: 300 })
subject.next({ url: 'http://baz', delay: 100 })


function httpGet(url, delay) {
  return new Promise(resolve => {
    setTimeout(() => {
      const obj = { ResultData: url + ' -> resolved' }      
      resolve(JSON.stringify(obj))
    }, delay)
  })
}

まずconcatMapを使ったときのoutputは

start
http://foo -> resolved
http://bar -> resolved
http://baz -> resolved

となります。delayは http://fooが一番大きいにも関わらず subject.next の発行順で出力されていますね。

これは「アクションの発行順が大事なので前のアクションが終わるまでは次のアクションを待たせて欲しい」というような場面で使われます。

次にmergeMapを使ったときのoutputは

start
http://baz -> resolved
http://bar -> resolved
http://foo -> resolved

となります。 subject.next の発行順は無視され、非同期の解決順で並びます。delayの少ない順になるというわけです。

これは「アクションの発行順は問わないので非同期が解決した順にどんどん次に流して欲しい」というような場面で使われます。

最後にswitchMapを使ったときのoutputは

start
http://baz -> resolved

となります。前の非同期処理が解決する前に次の処理が流れてくると前のものはキャンセルされてしまいます。この例では http://foohttp://bar の処理はキャンセルされていますね。

これは例えば「キーボード入力に対してリアルタイムに検索結果を返すようなときに不要な非同期処理は適宜キャンセルして欲しい」というような場面で使われます。

最後に

それぞれの違いがなんとなくお分かりいただけたでしょうか。

とりあえずはmergeMapで書いて、非同期処理をせき止めたいときはconcatMapを使ってみて、キャンセルしたかったらswitchMapを使う、みたいな理解で良いかと思います。

  • この記事は以下の記事からリンクされています
  • RxJSをはじめるからリンク