(2021/06/02 update: RxJS 7 対応)
ちきさんです。趣味はRxの再実装です。
さてRxJSの数あるオペレーターの中でも3大謎オペとして知られるconcatMap, mergeMap, switchMapについてお勉強しましょう。
(これらのオペレーター以前の段階で躓いている方にはちょっと難しい内容かもしれません)
この中でもよく使われるのは**mergeMap(flatMap)**かなと思いますのでとりあえずはこれを中心に話を進めていきます。
mergeMapの極意1 ~新たなストリームを生み出す~
mergeMapの第一の特徴は、引数の値を使って新たなObservableを生成できるという点です。
例えば、
const { of } = rxjs
of(5)
.subscribe(value => {
console.log(value)
})
これのoutputは
5
これだけです。誰でもわかりますね。
それではmergeMapを使うとどういうことができるのか見てみましょう。
const { of, range, interval } = rxjs
const { mergeMap, map, zipWith } = rxjs.operators
of(5)
.pipe(
mergeMap(value => {
return range(1, value)
.pipe(
zipWith(interval(500))
)
}),
map(values => values[0])
)
.subscribe(value => {
console.log(value)
})
これぐらいになると初見殺し感が出てきます。でも普通はこんな意味不明なコードは書かないので安心してください。
これのoutputは
1
(...500ms)
2
(...500ms)
3
(...500ms)
4
(...500ms)
5
となります。
同じ of(5)
で始まるのに後者は時間軸が加わった上に1から5までの数値が生成されています。おそるべしmergeMap。
先程のコードで注目すべきは
mergeMap(value => {
return range(1, value)
.pipe(
zipWith(interval(500))
)
}),
この部分で、 value === 5
なのですがその次に何が起きているかというと、
-
range(1, value)
によって[1,2,3,4,5]
という配列が生成される。 -
interval(500)
というストリームが新たに作られる。これは500ms毎に値を流す。 -
zipWith
オペレーターにより[1,2,3,4,5]
という配列とinterval
が合成され、500ms毎に配列の値を一つずつ次に流すストリームが作られる。
というわけです。
なるほど。
mergeMapの極意2 ~非同期をさりげなく解決させる~
concatMapとswitchMapにも言えることですが、実は非同期処理を解決させてから値を次に流してくれます。
const { of } = rxjs
const { mergeMap } = rxjs.operators
of(5)
.pipe(
mergeMap(value => {
return Promise.resolve(value)
})
)
.subscribe(value => {
console.log(value)
})
この書き方はアリです。
また次のようにPromiseではなくObservableも受けられます。
mergeMap(value => {
return of(value)
})
そしてこの非同期処理を解決させて次に流すという機能は便利な反面、concatMap, mergeMap, switchMapの挙動の違いを理解していないと上手に使いこなすことができません。
concatMap, mergeMap, switchMapの違い
3者の違いがわかるようなコードを書いてみました。
ちなみにデモはこちら → https://jsbin.com/tucular/2/edit?js,console
const { Subject } = rxjs
const { concatMap, mergeMap, switchMap, map } = rxjs.operators
const subject = new Subject()
subject
.pipe(
concatMap(obj => { // concatMap, mergeMap, switchMap で結果が変わる
return httpGet(obj.url, obj.delay)
}),
map(res => JSON.parse(res).ResultData)
)
.subscribe(value => {
console.log(value)
})
subject.next({ url: 'http://foo', delay: 500 })
subject.next({ url: 'http://bar', delay: 300 })
subject.next({ url: 'http://baz', delay: 100 })
function httpGet(url, delay) {
return new Promise(resolve => {
setTimeout(() => {
const obj = { ResultData: url + ' -> resolved' }
resolve(JSON.stringify(obj))
}, delay)
})
}
まずconcatMapを使ったときのoutputは
start
http://foo -> resolved
http://bar -> resolved
http://baz -> resolved
となります。delayは http://foo
が一番大きいにも関わらず subject.next
の発行順で出力されていますね。
これは「アクションの発行順が大事なので前のアクションが終わるまでは次のアクションを待たせて欲しい」というような場面で使われます。
次にmergeMapを使ったときのoutputは
start
http://baz -> resolved
http://bar -> resolved
http://foo -> resolved
となります。 subject.next
の発行順は無視され、非同期処理の解決順で並びます。delayの少ない順になるというわけです。
これは「アクションの発行順は問わないので非同期処理が解決した順にどんどん次に流して欲しい」というような場面で使われます。
最後にswitchMapを使ったときのoutputは
start
http://baz -> resolved
となります。前の非同期処理が解決する前に次の処理が流れてくると前のものはキャンセルされてしまいます。この例では http://foo
と http://bar
の処理はキャンセルされていますね。
これは例えば「キーボード入力に対してリアルタイムに検索結果を返すようなときに不要な非同期処理は適宜キャンセルして欲しい」というような場面で使われます。
最後に
それぞれの違いがなんとなくお分かりいただけたでしょうか。
とりあえずはmergeMapで書いて、非同期処理をせき止めたいときはconcatMapを使ってみて、キャンセルしたかったらswitchMapを使う、みたいな理解で良いかと思います。