LoginSignup
1
1

More than 1 year has passed since last update.

【RxJava】flatMapとconcatMapとswitchMapの違い

Last updated at Posted at 2021-03-08

はじめに

仕事でRxを使ってAndroidのコードを書いているのですが、普段はflatMapばかり使っていてconcatMap、switchMapなどの類似したオペレータとの違いをよく理解していなかったので調べてみました。

Javadocを読んでみる

以下を参照しています。

まずそれぞれ関数を1つ引数にとる基本的なオペレータをみてみます。シグネチャは以下の通りです。

flatMap

public final <R> Observable<R> flatMap(Function<? super T,? extends ObservableSource<? extends R>> mapper)
concatMap

public final <R> Observable<R> concatMap(Function<? super T,? extends ObservableSource<? extends R>> mapper)
switchMap

public final <R> Observable<R> switchMap(Function<? super T,? extends ObservableSource<? extends R>> mapper)

引数の型は全て同じFunction型で、source ObservableSourceから流れてくるitemを引数に取り、ObservableSourceを返す関数(型インタフェース)のmapperです。
3つとも流れてきたitemから新しいObservableSourceを生成する系のオペレータのようです。

次にdescriptionとマーブル図を読んでみます。

flatMap

Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source ObservableSource, where that function returns an ObservableSource, and then merging those resulting ObservableSources and emitting the results of this merger.
flatMap.png

前半部分はシグネチャからもわかることが書かれていますね。後半が大事っぽいです。
結果として返されるObservableSourceをmergeしてこのmergerの結果をemitする、とのこと。mergeするとはたぶんmergeのこと。

マーブル図を見てみます。mapperは白い丸を受け取って2つの白い菱形をemitしています。流れてきたitemの色だけを受け継いで、菱形のitemを2つemitする、というObservableに変換する処理みたいです。
下のラインをみると赤丸から赤菱形が2つ生成されています。続いて緑と青が短い間隔で流れてきています。出力された菱形は緑青緑青と交互になっていますが、各色2つずつemitされています。これがmergeしてmergerの結果をemitする、という動きを表しているみたいですね。なるほど。

concatMap

Returns a new Observable that emits items resulting from applying a function that you supply to each item emitted by the source ObservableSource, where that function returns an ObservableSource, and then emitting the items that result from concatenating those resulting ObservableSources.
image.png

flatMap同様に前半はあまり変化がないので飛ばします。concatMapはmapperが返したObservableSource達をconcatenateしてemitする、とのことです。
concatenateするconcatかな?たぶん。mergeとの違いは1つのObservableSourceのemitが終わってから次のObservableSourceのitemがemitされることみたいですね。ObservableSource同士の順番を変えたくない時に使えそうです。
マーブル図を見ると、mapperはflatMapと同じで、下流へemitされる順番がflatMapとは異なっています。mergeとconcatの違い(さらにいうとflatMapとの違い)がうまく表現されています。

switchMap

Returns a new ObservableSource by applying a function that you supply to each item emitted by the source ObservableSource that returns an ObservableSource, and then emitting the items emitted by the most recently emitted of these ObservableSources.
The resulting ObservableSource completes if both the upstream ObservableSource and the last inner ObservableSource, if any, complete. If the upstream ObservableSource signals an onError, the inner ObservableSource is disposed and the error delivered in-sequence.
image.png

後半(and then emitting以降)だけ読みます。flatMapやconcatMapと違って、mergeやconcatのような見慣れた単語は出てきません。その上、emitting、emitted、emittedってすごい何言ってるかわからない感じの文です。。翻訳すると、これらのObservableSourcesの最後に発行されたアイテムによって発行されたアイテムを発行しますらしいです。ネイティブの人なら理解できるのでしょうか。結構悩みましたが、マーブル図を見ながらなんとか読み解いてみます。
mapperは先ほどまでと少し違います。白丸を受け取って、白菱形と白正方形をemitするObservableSourceを返す関数のようです。上流はflatMapやconcatMapと同じですね。出力される結果を見ると、赤丸は変換後のitemが2つともemitされています。緑は最初にemitされる菱形のみ、青は赤と同様に2つともemitされています。
先ほどの難解な文と併せると、mapperが返したObservableSourceがitemをemitしたら下流にitemが流れていくが、emitし終わる前に、次なるObservableSourceがitemをemitし始めると、emit途中だったObservableSourceのitemは下流には流れていかない仕組みになっていると考えられます。

動きをみてみる

上記からそれぞれの特徴が掴めた気がしますので、違いが出そうなコードを書いてみました。
違いをわかりやすくするために、おおよそでも時間が知りたかったのでintervalを使ってstartからの経過時間も出力されるようにしています。
やってることは単純です。
1. Observable.just()でまずitemを5つemit
2. mapperでは、(6-item)秒待ってから、N番目の処理が終わりましたというStringをemitするObservableSourceを返す
3. subscribeのonNextで時間と2のStringをprint

では見ていきます。

flatMap

fun main() {
    println("--------------start--------------")
    val interval = Observable.interval(0, 100, TimeUnit.MILLISECONDS)
    Observable
            .just(1,2,3,4,5)
            .flatMap { item ->
                val timer = 6 - item
                Observable.timer(timer.toLong(), TimeUnit.SECONDS).map { "${item}番目の処理が終わりました" } }
            .withLatestFrom(interval, BiFunction { s: String, time: Long -> Pair(s, time) } )
            .subscribe(
                    { println("${it.second / 10}s : ${it.first}") },
                    {},
                    {
                        println("--------------end--------------")
                        exitProcess(0)
                    }
            )

    Thread.sleep(100000)
}
出力結果
--------------start--------------
1s : 5番目の処理が終わりました
2s : 4番目の処理が終わりました
3s : 3番目の処理が終わりました
4s : 2番目の処理が終わりました
5s : 1番目の処理が終わりました
--------------end--------------

Process finished with exit code 0

flatMapはObservableSourcesをmergeするので、mapperが返すObservableSourceがemitした順番に下流にitemが流れてきています。

次に、concatMapです。

concatMap

fun main() {
    println("--------------start--------------")
    val interval = Observable.interval(0, 1, TimeUnit.MILLISECONDS)
    Observable
            .just(1,2,3,4,5)
            .concatMap { item ->
                val timer = 6 - item
                Observable.timer(timer.toLong(), TimeUnit.SECONDS).map { "${item}の処理が終わりました" } }
            .withLatestFrom(interval, BiFunction { s: String, time: Long -> Pair(s, time) } )
            .subscribe(
                    { println("${it.second}ms : ${it.first}") },
                    {},
                    {
                        println("--------------完了--------------")
                        exitProcess(0)
                    }
            )

    Thread.sleep(100000)
}

出力結果
--------------start--------------
5s : 1番目の処理が終わりました
9s : 2番目の処理が終わりました
12s : 3番目の処理が終わりました
14s : 4番目の処理が終わりました
15s : 5番目の処理が終わりました
--------------end--------------

Process finished with exit code 0

concatMapでは、source ObservableSourceから流れてきた順番を大事にするので、1番目から順にprintされています。
今回のコードでは先に流れてきたitemほど待ち時間を長くしているので、5番目が終了するまで15秒かかっていますね。
ちなみに、concatMapEagerというオペレータを使えば、eagerに解決してくれるので、処理の順番を守りながらトータルの実行時間を短くすることができます。
おまけに書いておきます。

最後にswitchMapです。

switchMap

fun main() {
    println("--------------start--------------")
    val interval = Observable.interval(0, 100, TimeUnit.MILLISECONDS)
    Observable
            .just(1,2,3,4,5)
            .switchMap { item ->
                val timer = 6 - item
                Observable.timer(timer.toLong(), TimeUnit.SECONDS).map { "${item}番目の処理が終わりました" } }
            .withLatestFrom(interval, BiFunction { s: String, time: Long -> Pair(s, time) } )
            .subscribe(
                    { println("${it.second / 10}s : ${it.first}") },
                    {},
                    {
                        println("--------------end--------------")
                        exitProcess(0)
                    }
            )

    Thread.sleep(100000)
}
出力結果
--------------start--------------
1s : 5番目の処理が終わりました
--------------end--------------

Process finished with exit code 0

先に流れてきたitemほど待ち時間が長いので、結局5番目だけが下流に流れています。flatMapとconcatMapと並べて書きましたが、switchMapは少し違ったコードでいろいろ動きをみてみると面白そうです。

以上、3つのオペレータの動きを確認しました。動かしてみるとドキュメントだけでは理解しきれない部分が理解できてよかったです。

おわりに

1年半くらいRxを使っていますが、Rxにはオペレータがたくさんあっていまだにどれが最適なのかわかりません。その都度、今回のようにドキュメントを読んで違いを理解して1つずつ覚えていくことが大事かなと思います。
指摘などありましたらよろしくお願いします。

おまけ

concatMapEagerの動きもみてみます。一番時間のかかる処理が終わるまで待って、ほぼ同時に下流に流していますね。
自分の経験的にはconcatMap使う場面では、concatMapEagerの方がトータルの処理時間が短くなるので適していることが多い気がします。

concatMap

fun main() {
    println("--------------start--------------")
    val interval = Observable.interval(0, 1, TimeUnit.MILLISECONDS)
    Observable
            .just(1,2,3,4,5)
            .concatMapEager { item ->
                val timer = 6 - item
                Observable.timer(timer.toLong(), TimeUnit.SECONDS).map { "${item}の処理が終わりました" } }
            .withLatestFrom(interval, BiFunction { s: String, time: Long -> Pair(s, time) } )
            .subscribe(
                    { println("${it.second}ms : ${it.first}") },
                    {},
                    {
                        println("--------------完了--------------")
                        exitProcess(0)
                    }
            )

    Thread.sleep(100000)
}

出力結果
--------------start--------------
5s : 1番目の処理が終わりました
5s : 2番目の処理が終わりました
5s : 3番目の処理が終わりました
5s : 4番目の処理が終わりました
5s : 5番目の処理が終わりました
--------------end--------------

Process finished with exit code 0

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1