Posted at
RxSwiftDay 22

RxSwift スレッドクイズ (解答・解説編)

More than 1 year has passed since last update.


この記事何?


  • RxSwift Advent Calendar 2016の22日目の記事です。

  • クイズを通して、マルチスレッド環境下におけるRxSwiftの挙動と、observeOnsubscribeOnなどのオペレーターや DriverControlEvent などのスレッド周りの性質を解説していきます


  • どのスレッドでなにが起こるのかがわかるようになります!!!


  • あのなんだか難しそうだったsubscribeOnオペレータもきっと理解できます!!



クイズについて

長くなってしまうので別記事にしてあります。

純粋にまずクイズにチャレンジしたい人はこちらの記事を先にお読みください!

RxSwift スレッドクイズ(問題編)

いくつかのルールも書いてあります(トップレベルはメインスレッド前提だよ、など)


はじめに

問題の解説を行う前にいくつか事前知識や用語の説明をしておきます。

詳細は後で説明しますが、どのスレッドでイベントが流れてくるかを判別するにはどのスレッドでsubscribe が呼ばれたかが重要になってきます。

ここでいうsubscribe はもちろん普通に呼ぶ subscribebind もそうなんですが、内部的なsubscribeも含みます


内部的なsubscribeとは?

Observable.of(1, 2, 3)

.map { $0 * 2 }
.filter { $0 % 3 == 0 }

スクリーンショット 2016-12-14 21.43.10.png (32.1 kB)

よく見る図としてこういうのがあると思いますが、各Operatorは前のObservable(Operator)を source という名前で持っています。例えばこの図で言うとFilter のsourceが MapMap のsourceがSequence ということになります。

スクリーンショット 2016-12-14 21.43.53.png (54.1 kB)

ColdなOperatorはsubscribeが呼ばれるとsource(つまり前のObservable)のsubscribeを行います。1 これを指して内部的なsubscribe と(僕は)呼んでいます。

スクリーンショット 2016-12-14 21.47.47.png (73.3 kB)

これらを含めどのスレッドでsubscribe が呼ばれたかによってイベントが流れてくるスレッドが決まってきます。


subscribe と on

実際に興味があるのは「どのスレッドでイベントが流れてくるか」であることが多いのですが、これがsubscribeされるスレッドによって決まってくる以上、2つの操作についてどのスレッドで行われるかを考えていく必要があります。

内部実装での名前に合わせて、


  • subscribeする操作をそのまま「subscribe

  • イベントを送出する操作を「on

と呼ぶことにして、それぞれどのスレッドで行われるかを解説してきます。

スクリーンショット 2016-12-14 21.49.52.png (96.3 kB)


第1問の答えと解説

(1)のmapにはどのスレッドでイベントが流れてくるでしょう?


第1問

Observable.of(1, 2, 3)

.map { $0 * 2 } // (1)
.subscribe()

答えはメインスレッドです。 答えだけ見れば簡単じゃん!と思うかもしれませんが、なにがどう動いてメインスレッドで動作したのかをみてみると、このシンプルな例の中にスレッドに関する重要なルールをいくつかみることができます。


スレッドに関するルール


  1. 通常のOperatorは 同じスレッドで次の on / subscribe を呼ぶ

  2. Create系Operatorは subscribe が呼ばれたスレッドでon を呼ぶ

スクリーンショット 2016-12-14 21.57.19.png (109.1 kB)

ルール1.の挙動は以下で確認できます。



  • Mapsubscribeが呼ばれたスレッドで次の subscribeを呼んでいる


  • Mapon が呼ばれたスレッドで次の on を呼んでいる

つまり、明示的にスレッドを変えるような操作を何もしていなければ動作するスレッドは変わらないということです。

また、Sequence において、ルール2.が確認できます。



  • Sequencesubscribe が呼ばれたスレッドで on を呼んでいる

その他、justcreateなど、Create系のOperatorはsubscribeが呼ばれたスレッドでイベントを送出します


第2問の答えと解説

(1)のmapにはどのスレッドでイベントが流れてくるでしょう?


第2問

let o = Observable.of(1, 2, 3)

.map { $0 * 2 } // (1)

backgroundQueue.async {
o.subscribe() // バックグラウンドでsubscribe
}


答えは バックグラウンドスレッドです。

上記のルールにそって考えればバックグラウンドスレッドで呼んだ subscribe を起点として、すべての subscribeon がバックグラウンドで行われることがわかると思います。

スクリーンショット 2016-12-14 22.02.31.png (108.2 kB)

最初に言った「どのスレッドでsubscribeが呼ばれるかが重要」の意味がだんだん実感できてきたのではないでしょうか!


第3問の答えと解説

(1)のmapにはどのスレッドでイベントが流れてくるでしょう?


第3問

let o = Observable.of(1, 2, 3)

.map { $0 * 2 } // (1)
.publish()

o.subscribe()

backgroundQueue.async {
o.connect()
}


Hot変換が関連しているパターンです。

subscribeはメインスレッドで行っていますが、 connect をバックグラウンドで行っています。全体の流れはどうなるでしょうか?

答えは バックグラウンドスレッドです

スクリーンショット 2016-12-14 22.14.00.png (114.7 kB)

途中で ConnectableObservable が挟まっています。

ConnectableObservableconnect を呼んだタイミングで内部的にsourceのsubscribe を行います。その際のスレッドはもちろん connect を呼んだスレッドになります。

末端のsubscribeではだけでなく、内部の subscribe の動きが関係してくるのでちょっと難しかったのではないでしょうか!


第4問の答えと解説

(1)のmapにはどのスレッドでイベントが流れてくるでしょう?


第4問

let o = Observable.of(1, 2, 3)

.map { $0 * 2 } // (1)
.share()

backgroundQueue.async {
// バックグラウンドスレッド
o.subscribe()

mainQueue.async {
// メインスレッド
o.subscribe()
}
}


share が呼ばれているのと、2つのスレッドでsubscribe を呼んでいます。

答えはバックグラウンドスレッドです。

スクリーンショット 2016-12-14 22.35.06.png (109.0 kB)

share オペレーターは以下のように定義されています。

public func share() -> Observable<E> {

return self.publish().refCount()
}

refCount オペレータはConnectableObservable に対してconnectの代わりに呼ぶことのできるメソッドで、最初にsubscribeされたときに内部で connectを勝手に呼んでくれます。

if _parent._count == 0 {

_parent._count = 1
_parent._connectableSubscription = _parent._source.connect()
}

今回は先にバックグラウンドスレッドで subscribe を読んでいるので、結果すべての操作が バックグラウンドで行われています。

またHot変換されているのでメインスレッドの方での subsribe によって再度 map に値が流れてくることはない点にも注意してください。

いかがでしょうか?? ここまでが純粋に subscribe をいろんなスレッドで呼んだ場合の問題になります。

次は Scheduler を織り交ぜた場合の問題になります!


第5問の答えと解説

(1)のmapにはどのスレッドでイベントが流れてくるでしょう?


第5問

let o = Observable.of(1, 2, 3, scheduler: mainScheduler)

.map { $0 * 2 } // (1)

backgroundQueue.async {
o.subscribe()
}


subscribe こそバックグラウンドで行っていますが、 mainSchedulerが 設定されている点に注意してください。

答えは メインスレッド になります。

create系のoperatorにはSchedulerを設定することができ、その際には以下のルールが適用されます。


スレッドに関するルール


  • Schedulerが設定されているCreate系Operatorは、subscribe されたスレッドと関係なくそのスレッド(Scheduler)でon を呼ぶ。

スクリーンショット 2016-12-14 22.44.08.png (75.6 kB)

バックグラウンドで subscribe していますが、MainSchedulerを設定しているため、on はメインスレッドで呼ばれているのが確認できるかと思います。

Create系Operatorに設定するSchedulerのルールは以上になりますが、他にもSchedulerを設定する術があります。

それが observeOn / subscribeOn オペレータです。


第6問の答えと解説

(1), (2) のmapにはどのスレッドでイベントが流れてくるでしょう?


第6問

Observable.of(1, 2, 3)

.map { $0 * 2 } // (1)
.observeOn(backgroundScheduler)
.map { $0 * 2 } // (2)
.subscribe()

答えを先に書いてしまいますが、


  • (1) はメインスレッド

  • (2)はバックグラウンドスレッド

でイベントが流れてきます。

さて、 observeOn オペレータがでてきましたがこちらは使い慣れている方も多いのではないでしょうか。

この記事での用語を使うと、このオペレータはon するスレッドを変更するためもの です。

スクリーンショット 2016-12-14 22.59.24.png (128.0 kB)

observeOn の前後で on の動作スレッドが変化しているのが確認できるかと思います。


スレッドに関するルール


  • observeOnon が行われる Scheduler(スレッド) を変更する

ここまでくれば、なんだか難しそうだった subscribeOn オペレータの挙動も容易に理解できそうな気がしてきますね!!!!


第7問の答えと解説

(1), (2) のmapにはどのスレッドでイベントが流れてくるでしょう?


第7問

Observable.of(1, 2, 3)

.map { $0 * 2 } // (1)
.subscribeOn(backgroundScheduler)
.map { $0 * 2 } // (2)
.subscribe()

答えは 両方ともバックグラウンドスレッド で動作します。

スクリーンショット 2016-12-14 23.12.48.png (126.3 kB)

もうだいたい想像はついているかもしれませんが、この記事での用語を使うと、subscribeOnsubscribe するスレッドを変更するためもの です。

上の例でも subscribeOn の前後でsubscribeのスレッドが変わっていることが確認できると思います。

まぁざっくり言ってしまえば以下は同じことです。

o.subscribeOn(backgroundScheduler).subscribe()

backgroundQueue.async {

o.subscribe()
}


スレッドに関するルール


  • subscribeOnsubscribe が行われる Scheduler(スレッド) を変更する

これで大体のルールは説明し終わりました!!あとは組み合わせの問題です!!


第8問の答えと解説

(1), (2) のmapにはどのスレッドでイベントが流れてくるでしょう?


第8問

Observable.of(1, 2, 3, scheduler: backgroundScheduler)

.map { $0 * 2 } // (1)
.observeOn(mainScheduler)
.map { $0 * 2 } // (2)
.subscribe()

答えは


  • (1)はバックグラウンドスレッド

  • (2)はメインスレッド

です。

スクリーンショット 2016-12-14 23.25.18.png (128.6 kB)

メインスレッドでsubscribeを行っていますが、 backgroundScheduler が設定されているので on がバックグラウンドスレッドで呼ばれ始めます。しかしobserveOnon を行うスレッドが変更されているので (2)にはメインスレッドでイベントが流れてきます。


第9問の答えと解説

(1), (2) のmapにはどのスレッドでイベントが流れてくるでしょう?



Observable.of(1, 2, 3, scheduler: mainScheduler)

.map { $0 * 2 } // (1)
.subscribeOn(backgroundScheduler)
.map { $0 * 2 } // (2)
.subscribe()

答えは両方ともメインスレッドです

スクリーンショット 2016-12-14 23.28.39.png (127.5 kB)

もう完璧ですね!!!

最後にRxCocoaのクラスをいくつか紹介して終わりにします。もちろんここまでのルールもでてきますよ!!


第10問の答えと解説

(1), (2) のmapにはどのスレッドでイベントが流れてくるでしょう?


第10問

let o = Observable.of(1, 2, 3)

ControlEvent<Int>(events: o)
.map { $0 * 2 } // (1)
.subscribeOn(backgroundScheduler)
.map { $0 * 2 } // (2)
.subscribe()


ControlEventControlProperty はUIKitとの連携に使われてるやつです。

スレッド以外の性質もあるのですが、今回はスレッドについてのみ着目します。

なんとなくメインスレッドになりそうな感じはありますが、実際はどんな挙動になるのでしょうか?

答えは(1), (2) ともにメインスレッドです。

ControlEventObservable を元に作ることができますが、以下のようにsubscribeOn をかませています。

public init<Ev: ObservableType>(events: Ev) where Ev.E == E {

_events = events.subscribeOn(ConcurrentMainScheduler.instance)
}

つまりControlEventsubscribe は常にメインスレッドで行われます。

スクリーンショット 2016-12-14 23.39.45.png (133.7 kB)

Map の前後で subscribe のスレッドが変わっていますが、要は subscribeOn オペレータが挟まってるだけです。

では次の問題のようにControlEvent を作ったらどうなるでしょうか??


第11問の答えと解説

(1), (2) のmapにはどのスレッドでイベントが流れてくるでしょう?


第11問

let o = Observable.of(1, 2, 3, scheduler: backgroundScheduler)

ControlEvent<Int>(events: o)
.map { $0 * 2 } // (1)
.subscribeOn(backgroundScheduler)
.map { $0 * 2 } // (2)
.subscribe()


答えは (1), (2) ともバックグラウンドスレッドです。

スクリーンショット 2016-12-14 23.41.25.png (131.1 kB)

UIKitの連携に使うのだからメインスレッドになりそうなイメージですが、上記の通り subscribeOn をかませているだけなので別にon をメインスレッドで流すことを保証はしていません。なのでこの問題のようにバックグラウンドでon を流すように作ることも可能です。

まぁこんな使い方はされないとは思うので、RxCocoaで用意されてるものはメインスレッドで流れてくると覚えておいても差し支えないと思います。。

最後はDriverに関する問題です。


第12問の答えと解説

(1) のmapにはどのスレッドでイベントが流れてくるでしょう?


第12問

let d = Observable.of(1, 2, 3)

.asDriver(onErrorJustReturn: 0)
.map { $0 * 2 } // (1)

backgroundQueue.async {
d.drive()
}


Driver をバックグラウンドスレッドでsubscribe ( drive ) しようとするとどうなるでしょう??

答えは....実行時エラーになります。

メインスレッドでしかdrive は呼べないんですね〜〜。 ControlEvent のときのように subscribeOn オペレータがはさまってるわけでもなく、rxFatalError が呼ばれてクラッシュします。

public func drive<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {

MainScheduler.ensureExecutingOnScheduler(errorMessage: driverErrorMessage) // ここでチェックされてる
return self.asSharedSequence().asObservable().subscribe(observer)
}

subscribe がバックグラウンドで呼べないことはわかりましたが、 on はどうなるでしょうか?


第13問の答えと解説

(1) のmapにはどのスレッドでイベントが流れてくるでしょう?


第13問

Observable.of(1, 2, 3, scheduler: backgroundScheduler)

.asDriver(onErrorJustReturn: 0)
.map { $0 * 2 } // (1)
.drive()

答えは メインスレッドです。

スクリーンショット 2016-12-17 22.30.21.png (104.4 kB)

asDriver の実装を見てみるとobserveOn オペレータが呼ばれていることがわかります。

public func asDriver(onErrorJustReturn: E) -> Driver<E> {

let source = self
.asObservable()
.observeOn(DriverSharingStrategy.scheduler)
.catchErrorJustReturn(onErrorJustReturn)
return Driver(source)
}

なのでDriver以降はメインスレッドで on が呼ばれるようになります。


最後に

いかがだったでしょうか! 実は以前社内に布教活動をしたときの資料の焼き直しなのでよければそちらも御覧ください。

ここまで長々と解説しておいてアレですが、動作スレッドを変えたい目的であれば observeOnDriver が使えればアプリケーション開発では十分かなぁとは思います。

(いままでフルRxSwiftのプロジェクトに2つほど関わってきましたが、少なくとも僕は subscribeOn が必要な場面に出会ったことはありません。そして、動作スレッドを変える目的で subscribeOnを使わないほうが良いです。。。)

ViewModelとViewのバインディングだけにRxを使っているとなかなかメインスレッド以外で処理する機会もないかもしれませんが、非同期処理の抽象化としてObservable を使っているといろんな処理を別スレッドで行いたいこともでてくるかと思うので、その際にはこの記事をお役立てください!

明日はいつも僕の隣で仕事をしている @tarunonくんです!!!乞うご期待!!!





  1. 実際にはSink と呼ばれるクラスがsourceのobserverになります。Map オペレーターには MapSink があるように基本的には各オペレータについてXXXSink が存在し、sourceのsubscribeを行っています。