RxSwiftのflatMapって理解しにくくないですか?
僕は理解するのに大分苦労しました。
##前提:複数のストリームを一箇所に流すには
複数のObservableの変更を、一箇所に集約するには下記のようにそれぞれbindしては駄目です。
//出力用のObserver
let output = PublishSubject<String>()
output
.subscribe(
onNext: { print($0) },
onError: nil,
onCompleted: { print("onCompleted") }
)
//入力用のObservable
let stream1 = PublishSubject<String>()
let stream2 = PublishSubject<String>()
stream1.bind(to: output)//stream1の変化をoutputに通知
stream2.bind(to: output)//stream2の変化をoutputに通知
一応想定通りに動きますが、どちらかのObservableがcompleteすると、流れが止まります。
stream1.onNext("stream1-1")
stream2.onNext("stream2-1")
stream2.onNext("stream2-2")
stream2.onCompletced()
stream1.onNext("stream1-2")
stream1.onNext("stream1-3")
stream1.onNext("stream1-4")
//結果
stream1-1
stream2-1
stream2-2
onCompleted
stream2側でonCompletcedを呼び出してますが、その後のstream1からのイベントも止まってしまうのです。
##複数のストリームはちゃんとオペレーターで束ねましょう
ではどうすればいいかと言うと、merge オペレータで束ねます
let output = PublishSubject<String>()
output
.subscribe(
onNext: { print($0) },
onError: nil,
onCompleted: { print("onCompleted") }
)
//入力用のObservable
let stream1 = PublishSubject<String>()
let stream2 = PublishSubject<String>()
//stream1, stream2を束ねた新しいObservableを作り
let merged = Observable
.of(stream1, stream2)
.merge()
//これをoutputにbindする
let mergeBind = merged.bind(to: output)
stream1.onNext("stream1-1")
stream2.onNext("stream2-1")
stream2.onNext("stream2-2")
stream2.onCompletced()
//途中でstream2にonCompletcedさせる
stream1.onNext("stream1-2")
stream1.onNext("stream1-3")
//結果
merged-stream1-1
merged-stream2-1
merged-stream2-2
merged-stream1-2
merged-stream1-3
しかし、merge は後からObservableを追加する事ができません。新しく作り直したとしても、一旦 outputへのbindを解除してから再びbindし直す必要があります。
単に新しくbindしてしまうとoutputに対して複数の入力元をもたせることになり、最初の問題にぶつかります。
let merged2 = Observable
.of(merged, stream3)
.merge()
mergeBind.dispose() //一旦outputへのbindを解除してから
merged2.bind(to: output) //3つのストリームを合体したものをbindし直し
##本命! flatMapなら後から足せるよ
やっとflatMapの登場です。
flatMapは、
引数;「流れて来る本来のイベントの値を加工した上で、<任意のObservabl型> を返す関数」
戻り値 : <任意のObservabl型>
というう関数です。これがややこしい
//Observable<String>型をイベントとして流すObserverは、
let observableSubject = PublishSubject<Observable<String>>()
//onNextで流せるイベントは、Observable<String>型です。(当たり前ですね)
observableSubject.onNext(stream1)
observableSubject.onNext(stream2)
//一方で、flatMap関数は、「流れて来る本来のイベントの値を加工した上で、任意のObservabl型を返す関数」を引数に取ると
let flatMapped: Observable<String> = observableSubject
.flatMap { (origin: Observable<String>) -> Observable<String> in
//ここでreturnするObservableの型を返します
return origin
}
とりあえず、何が出来るのかというと、
let output = PublishSubject<String>()
output
.subscribe(
onNext: { print($0) },
onError: nil,
onCompleted: { print("onCompleted") }
)
//入力用のObservable
let stream1 = PublishSubject<String>()
let stream2 = PublishSubject<String>()
//Observable<String>型をイベントとして流すObserverを
let observableSubject = PublishSubject<Observable<String>>()
//flatMapでStringを流すObservableにしてあげると、outputにbindできます
observableSubject.flatMap{$0}.bind(to: output)
//observableSubject自体がonNextで流せるイベントは、Observable<String>型
observableSubject.onNext(stream1)
observableSubject.onNext(stream2)
//ここでstream1、stream2で値を流すと、mergeと同じ挙動になる
stream1.onNext("flatMap-stream1-1")
stream2.onNext("flatMap-stream2-1")
stream2.onNext("flatMap-stream2-2")
stream2.onCompleted()
stream1.onNext("flatMap-stream1-2")
stream1.onNext("flatMap-stream1-3")
結果
flatMap-stream1-1
flatMap-stream2-1
flatMap-stream2-2
flatMap-stream1-2
flatMap-stream1-3
あとからObservableを追加できるんです!
let stream3 = PublishSubject<String>()
observableSubject.onNext(stream3)
stream1.onNext("after-stream1")
stream3.onNext("after-stream3")
結果
after-stream1
after-stream3
##使い所は様々です
あとからObservable (イベントストリーム)を追加していけるので、いちいち再登録しないで良いんです。
//兎に角、結果は全部 outputに束ねてしまう
observableSubject.flatMap{$0}.bind(to: output)
observableSubject.onNext(stream1)
observableSubject.onNext(stream2)
//notification は Observable<String>
//次の一回だけ、notificationからの値をマージしたい
observableSubject.onNext(notification.take(1))
//takeResultは通信結果をObservable<String>で返す関数
observableSubject.onNext(takeResult())