この記事何?
-
RxSwift Advent Calendar 2016の22日目の記事です。
-
クイズを通して、マルチスレッド環境下におけるRxSwiftの挙動と、
observeOn
・subscribeOn
などのオペレーターやDriver
・ControlEvent
などのスレッド周りの性質を解説していきます -
どのスレッドでなにが起こるのかがわかるようになります!!!
-
あのなんだか難しそうだった
subscribeOn
オペレータもきっと理解できます!!
クイズについて
長くなってしまうので別記事にしてあります。
純粋にまずクイズにチャレンジしたい人はこちらの記事を先にお読みください!
いくつかのルールも書いてあります(トップレベルはメインスレッド前提だよ、など)
はじめに
問題の解説を行う前にいくつか事前知識や用語の説明をしておきます。
詳細は後で説明しますが、どのスレッドでイベントが流れてくるかを判別するにはどのスレッドでsubscribe
が呼ばれたかが重要になってきます。
ここでいうsubscribe
はもちろん普通に呼ぶ subscribe
やbind
もそうなんですが、内部的なsubscribeも含みます。
内部的なsubscribeとは?
Observable.of(1, 2, 3)
.map { $0 * 2 }
.filter { $0 % 3 == 0 }
よく見る図としてこういうのがあると思いますが、各Operatorは前のObservable(Operator)を source という名前で持っています。例えばこの図で言うとFilter
のsourceが Map
、Map
のsourceがSequence
ということになります。
ColdなOperatorはsubscribe
が呼ばれるとsource(つまり前のObservable)のsubscribeを行います。1 これを指して内部的なsubscribe と(僕は)呼んでいます。
これらを含めどのスレッドでsubscribe
が呼ばれたかによってイベントが流れてくるスレッドが決まってきます。
subscribe と on
実際に興味があるのは「どのスレッドでイベントが流れてくるか」であることが多いのですが、これがsubscribeされるスレッドによって決まってくる以上、2つの操作についてどのスレッドで行われるかを考えていく必要があります。
内部実装での名前に合わせて、
- subscribeする操作をそのまま「subscribe」
- イベントを送出する操作を「on」
と呼ぶことにして、それぞれどのスレッドで行われるかを解説してきます。
第1問の答えと解説
(1)のmapにはどのスレッドでイベントが流れてくるでしょう?
Observable.of(1, 2, 3)
.map { $0 * 2 } // (1)
.subscribe()
答えはメインスレッドです。 答えだけ見れば簡単じゃん!と思うかもしれませんが、なにがどう動いてメインスレッドで動作したのかをみてみると、このシンプルな例の中にスレッドに関する重要なルールをいくつかみることができます。
スレッドに関するルール
- 通常のOperatorは 同じスレッドで次の
on
/subscribe
を呼ぶ - Create系Operatorは
subscribe
が呼ばれたスレッドでon
を呼ぶ
ルール1.の挙動は以下で確認できます。
-
Map
はsubscribe
が呼ばれたスレッドで次のsubscribe
を呼んでいる -
Map
はon
が呼ばれたスレッドで次のon
を呼んでいる
つまり、明示的にスレッドを変えるような操作を何もしていなければ動作するスレッドは変わらないということです。
また、Sequence
において、ルール2.が確認できます。
-
Sequence
はsubscribe
が呼ばれたスレッドでon
を呼んでいる
その他、just
やcreate
など、Create系のOperatorはsubscribeが呼ばれたスレッドでイベントを送出します。
第2問の答えと解説
(1)のmapにはどのスレッドでイベントが流れてくるでしょう?
let o = Observable.of(1, 2, 3)
.map { $0 * 2 } // (1)
backgroundQueue.async {
o.subscribe() // バックグラウンドでsubscribe
}
答えは バックグラウンドスレッドです。
上記のルールにそって考えればバックグラウンドスレッドで呼んだ subscribe
を起点として、すべての subscribe
・on
がバックグラウンドで行われることがわかると思います。
最初に言った**「どのスレッドでsubscribeが呼ばれるかが重要」**の意味がだんだん実感できてきたのではないでしょうか!
第3問の答えと解説
(1)のmapにはどのスレッドでイベントが流れてくるでしょう?
let o = Observable.of(1, 2, 3)
.map { $0 * 2 } // (1)
.publish()
o.subscribe()
backgroundQueue.async {
o.connect()
}
Hot変換が関連しているパターンです。
subscribe
はメインスレッドで行っていますが、 connect
をバックグラウンドで行っています。全体の流れはどうなるでしょうか?
答えは バックグラウンドスレッドです。
途中で ConnectableObservable
が挟まっています。
ConnectableObservable
はconnect
を呼んだタイミングで内部的にsourceのsubscribe
を行います。その際のスレッドはもちろん connect
を呼んだスレッドになります。
末端のsubscribe
ではだけでなく、内部の subscribe
の動きが関係してくるのでちょっと難しかったのではないでしょうか!
第4問の答えと解説
(1)のmapにはどのスレッドでイベントが流れてくるでしょう?
let o = Observable.of(1, 2, 3)
.map { $0 * 2 } // (1)
.share()
backgroundQueue.async {
// バックグラウンドスレッド
o.subscribe()
mainQueue.async {
// メインスレッド
o.subscribe()
}
}
share
が呼ばれているのと、2つのスレッドでsubscribe
を呼んでいます。
答えはバックグラウンドスレッドです。
share
オペレーターは以下のように定義されています。
public func share() -> Observable<E> {
return self.publish().refCount()
}
refCount
オペレータはConnectableObservable
に対してconnect
の代わりに呼ぶことのできるメソッドで、最初にsubscribe
されたときに内部で connect
を勝手に呼んでくれます。
if _parent._count == 0 {
_parent._count = 1
_parent._connectableSubscription = _parent._source.connect()
}
今回は先にバックグラウンドスレッドで subscribe
を読んでいるので、結果すべての操作が バックグラウンドで行われています。
またHot変換されているのでメインスレッドの方での subsribe
によって再度 map
に値が流れてくることはない点にも注意してください。
いかがでしょうか?? ここまでが純粋に subscribe
をいろんなスレッドで呼んだ場合の問題になります。
次は Scheduler
を織り交ぜた場合の問題になります!
第5問の答えと解説
(1)のmapにはどのスレッドでイベントが流れてくるでしょう?
let o = Observable.of(1, 2, 3, scheduler: mainScheduler)
.map { $0 * 2 } // (1)
backgroundQueue.async {
o.subscribe()
}
subscribe
こそバックグラウンドで行っていますが、 **mainScheduler
**が 設定されている点に注意してください。
答えは メインスレッド になります。
create系のoperatorにはSchedulerを設定することができ、その際には以下のルールが適用されます。
スレッドに関するルール
- Schedulerが設定されているCreate系Operatorは、
subscribe
されたスレッドと関係なくそのスレッド(Scheduler)でon
を呼ぶ。
バックグラウンドで subscribe
していますが、MainSchedulerを設定しているため、on
はメインスレッドで呼ばれているのが確認できるかと思います。
Create系Operatorに設定するSchedulerのルールは以上になりますが、他にもSchedulerを設定する術があります。
それが observeOn
/ subscribeOn
オペレータです。
第6問の答えと解説
(1), (2) のmapにはどのスレッドでイベントが流れてくるでしょう?
Observable.of(1, 2, 3)
.map { $0 * 2 } // (1)
.observeOn(backgroundScheduler)
.map { $0 * 2 } // (2)
.subscribe()
答えを先に書いてしまいますが、
- (1) はメインスレッド
- (2)はバックグラウンドスレッド
でイベントが流れてきます。
さて、 observeOn
オペレータがでてきましたがこちらは使い慣れている方も多いのではないでしょうか。
この記事での用語を使うと、このオペレータは**on
するスレッドを変更するためもの** です。
observeOn
の前後で on
の動作スレッドが変化しているのが確認できるかと思います。
スレッドに関するルール
observeOn
はon
が行われる Scheduler(スレッド) を変更する
ここまでくれば、なんだか難しそうだった subscribeOn
オペレータの挙動も容易に理解できそうな気がしてきますね!!!!
第7問の答えと解説
(1), (2) のmapにはどのスレッドでイベントが流れてくるでしょう?
Observable.of(1, 2, 3)
.map { $0 * 2 } // (1)
.subscribeOn(backgroundScheduler)
.map { $0 * 2 } // (2)
.subscribe()
答えは 両方ともバックグラウンドスレッド で動作します。
もうだいたい想像はついているかもしれませんが、この記事での用語を使うと、subscribeOn
は**subscribe
するスレッドを変更するためもの** です。
上の例でも subscribeOn
の前後でsubscribe
のスレッドが変わっていることが確認できると思います。
まぁざっくり言ってしまえば以下は同じことです。
o.subscribeOn(backgroundScheduler).subscribe()
backgroundQueue.async {
o.subscribe()
}
スレッドに関するルール
subscribeOn
はsubscribe
が行われる Scheduler(スレッド) を変更する
これで大体のルールは説明し終わりました!!あとは組み合わせの問題です!!
第8問の答えと解説
(1), (2) のmapにはどのスレッドでイベントが流れてくるでしょう?
Observable.of(1, 2, 3, scheduler: backgroundScheduler)
.map { $0 * 2 } // (1)
.observeOn(mainScheduler)
.map { $0 * 2 } // (2)
.subscribe()
答えは
- (1)はバックグラウンドスレッド
- (2)はメインスレッド
です。
メインスレッドでsubscribe
を行っていますが、 backgroundScheduler
が設定されているので on
がバックグラウンドスレッドで呼ばれ始めます。しかしobserveOn
で on
を行うスレッドが変更されているので (2)にはメインスレッドでイベントが流れてきます。
第9問の答えと解説
(1), (2) のmapにはどのスレッドでイベントが流れてくるでしょう?
Observable.of(1, 2, 3, scheduler: mainScheduler)
.map { $0 * 2 } // (1)
.subscribeOn(backgroundScheduler)
.map { $0 * 2 } // (2)
.subscribe()
答えは両方ともメインスレッドです。
もう完璧ですね!!!
最後にRxCocoaのクラスをいくつか紹介して終わりにします。もちろんここまでのルールもでてきますよ!!
第10問の答えと解説
(1), (2) のmapにはどのスレッドでイベントが流れてくるでしょう?
let o = Observable.of(1, 2, 3)
ControlEvent<Int>(events: o)
.map { $0 * 2 } // (1)
.subscribeOn(backgroundScheduler)
.map { $0 * 2 } // (2)
.subscribe()
ControlEvent
や ControlProperty
はUIKitとの連携に使われてるやつです。
スレッド以外の性質もあるのですが、今回はスレッドについてのみ着目します。
なんとなくメインスレッドになりそうな感じはありますが、実際はどんな挙動になるのでしょうか?
答えは**(1), (2) ともにメインスレッドです。**
ControlEvent
は Observable
を元に作ることができますが、以下のようにsubscribeOn
をかませています。
public init<Ev: ObservableType>(events: Ev) where Ev.E == E {
_events = events.subscribeOn(ConcurrentMainScheduler.instance)
}
つまり**ControlEvent
の subscribe
は常にメインスレッドで行われます。**
Map
の前後で subscribe
のスレッドが変わっていますが、要は subscribeOn
オペレータが挟まってるだけです。
では次の問題のようにControlEvent
を作ったらどうなるでしょうか??
第11問の答えと解説
(1), (2) のmapにはどのスレッドでイベントが流れてくるでしょう?
let o = Observable.of(1, 2, 3, scheduler: backgroundScheduler)
ControlEvent<Int>(events: o)
.map { $0 * 2 } // (1)
.subscribeOn(backgroundScheduler)
.map { $0 * 2 } // (2)
.subscribe()
答えは (1), (2) ともバックグラウンドスレッドです。
UIKitの連携に使うのだからメインスレッドになりそうなイメージですが、上記の通り subscribeOn
をかませているだけなので別にon
をメインスレッドで流すことを保証はしていません。なのでこの問題のようにバックグラウンドでon
を流すように作ることも可能です。
まぁこんな使い方はされないとは思うので、RxCocoaで用意されてるものはメインスレッドで流れてくると覚えておいても差し支えないと思います。。
最後はDriverに関する問題です。
第12問の答えと解説
(1) のmapにはどのスレッドでイベントが流れてくるでしょう?
let d = Observable.of(1, 2, 3)
.asDriver(onErrorJustReturn: 0)
.map { $0 * 2 } // (1)
backgroundQueue.async {
d.drive()
}
Driver
をバックグラウンドスレッドでsubscribe
( drive
) しようとするとどうなるでしょう??
答えは....実行時エラーになります。
メインスレッドでしかdrive
は呼べないんですね〜〜。 ControlEvent
のときのように subscribeOn
オペレータがはさまってるわけでもなく、rxFatalError
が呼ばれてクラッシュします。
public func drive<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
MainScheduler.ensureExecutingOnScheduler(errorMessage: driverErrorMessage) // ここでチェックされてる
return self.asSharedSequence().asObservable().subscribe(observer)
}
subscribe
がバックグラウンドで呼べないことはわかりましたが、 on
はどうなるでしょうか?
第13問の答えと解説
(1) のmapにはどのスレッドでイベントが流れてくるでしょう?
Observable.of(1, 2, 3, scheduler: backgroundScheduler)
.asDriver(onErrorJustReturn: 0)
.map { $0 * 2 } // (1)
.drive()
答えは メインスレッドです。
asDriver
の実装を見てみるとobserveOn
オペレータが呼ばれていることがわかります。
public func asDriver(onErrorJustReturn: E) -> Driver<E> {
let source = self
.asObservable()
.observeOn(DriverSharingStrategy.scheduler)
.catchErrorJustReturn(onErrorJustReturn)
return Driver(source)
}
なのでDriver以降はメインスレッドで on
が呼ばれるようになります。
最後に
いかがだったでしょうか! 実は以前社内に布教活動をしたときの資料の焼き直しなのでよければそちらも御覧ください。
ここまで長々と解説しておいてアレですが、動作スレッドを変えたい目的であれば observeOn
や Driver
が使えればアプリケーション開発では十分かなぁとは思います。
(いままでフルRxSwiftのプロジェクトに2つほど関わってきましたが、少なくとも僕は subscribeOn
が必要な場面に出会ったことはありません。そして、動作スレッドを変える目的で subscribeOn
を使わないほうが良いです。。。)
ViewModelとViewのバインディングだけにRxを使っているとなかなかメインスレッド以外で処理する機会もないかもしれませんが、非同期処理の抽象化としてObservable
を使っているといろんな処理を別スレッドで行いたいこともでてくるかと思うので、その際にはこの記事をお役立てください!
明日はいつも僕の隣で仕事をしている @tarunonくんです!!!乞うご期待!!!
-
実際には
Sink
と呼ばれるクラスがsourceのobserverになります。Map
オペレーターにはMapSink
があるように基本的には各オペレータについてXXXSink
が存在し、sourceのsubscribeを行っています。 ↩