LoginSignup
11
11

More than 3 years have passed since last update.

RxSwift における subscribeOn と observeOn の違い

Posted at

RxSwift などで処理をつなげて書いていると、ここはメインスレッドで、ここはバックグランドスレッドでなどと、処理によってスレッドを切り替えたい場合があるかと思います。そんな時は、Observable のプロパティとして提供されている subscribeOn または、observeOn を使用することでスレッドを切り替えて処理を実行することができます。今回はそんな subscribeOnobserveOn の違いについて簡単にまとめていこうかと思います。

その前に Scheduler

RxSwift では Observable や subscribe の処理をどのスレッドでどのよう(直列・並列)に処理をさばくかを決定する役割を Scheduler と呼びます。これについては後で触れますが、RxSwift を使用し、マルチスレッドで処理を行う際にはとても重要な知識です。

subscribeOn

subscribeOn は参照するストリームの元となる Observable の実行スレッドとそれに続くメソッドのスレッドを決定します。SerialDispatchQueueScheduler で Scheduler を生成し、バックグランドスレッドの直列処理で実行したサンプルが下記のようになります(呼び出しはメインスレッド)。どのように出力されるか結果を見る前に予想してみてください👨‍💻

    let ob = Observable<Int>.create { observer -> Disposable in
        for i in 0...10000 {
            if i == 10000 {
                print("observer called!")
                observer.onNext(1)
                observer.onCompleted()
            }
        }
        return Disposables.create()
    }
    let scheduler = SerialDispatchQueueScheduler(qos: .background)
    ob.subscribeOn(scheduler)
        .subscribe(onNext: { _ in
            print(1)
            for i in 0...100 {
                if i == 100 {
                    print(2)
                }
            }
        })
        .disposed(by: disposeBag)
    ob.subscribeOn(scheduler)
        .subscribe(onNext: { _ in
            print(3)
        })
        .disposed(by: disposeBag)

出力

// 全てバックグランドスレッド
observer called!
1
2
observer called!
3

上から順番にバックグランドスレッドにタスクが追加されていき、直列でそれぞれの処理が実行されているのが分かります。また、基本的には、同じスレッドで実行したい場合には Scheduler のインスタンスは統一する必要があるかと思います。(試しに同じ Schedler を別々のインスタンスで実行したところ直列で実行がなされませんでした🤔原因がわかる人教えください🙇‍♂️)

observeOn

observeOn はどの Scheduler で次の Observer にイベントを流すのかを決定します。MainScheduler.instanceSchedler を生成して、購読部分のみメインスレッドで行ったサンプルは下記のようになります。

    DispatchQueue.global(qos: .background).async {
        let ob = Observable<Int>.create { observer -> Disposable in
            for i in 0...10000 {
                if i == 10000 {
                    print("observer called!")
                    observer.onNext(1)
                    observer.onCompleted()
                }
            }
            return Disposables.create()
        }
        ob.observeOn(MainScheduler.instance)
            .subscribe(onNext: { _ in
                print(1)
                for i in 0...100 {
                    if i == 100 {
                        print(2)
                    }
                }
            })
            .disposed(by: self.disposeBag)
        ob.observeOn(MainScheduler.instance)
            .subscribe(onNext: { _ in
                print(3)
            })
            .disposed(by: self.disposeBag)
    }

出力

observer called! // バックグランドスレッド
1 // メインスレッド
2 // メインスレッド
observer called! // バックグランドスレッド
3 // メインスレッド

こちらはバックグランドの処理により、全体の順番が変わることはありますが、基本的にはメインスレッドでの処理は直列処理なので、順番が変わることはありません。

Scheduler の種類

先でもスレッドや処理の実行順などを指定するのに Schedler を使用しましたが、RxSwift では現在下記のような Scheduler が用意されています。

Class 内容
CurrentThreadScheduler (Serial scheduler) Observable.create などで指定されるデフォルトのスケジューラにはこれがしてされていて、スレッドを切り替えず現在のスレッドで処理を実行します。
MainScheduler (Serial scheduler) これは Swift の UI などを更新するためのデフォルトのスレッドで、このスケジューラはメインメソッドから呼ばれた場合はスケジューリングなしですぐに処理が実行されます
SerialDispatchQueueScheduler (Serial scheduler) バックグランドで処理を直列に実行すれ場合に使用するスケジューラで、Qos を引数にとり最適なスレッドを選択することができます。
ConcurrentDispatchQueueScheduler (Concurrent scheduler) バックグランドで処理を並列に実行する場合に使用するスケジューラで、SerialDispatchQueueScheduler と同様に Qos を指定して使用できます。
OperationQueueScheduler (Concurrent scheduler) NSOperationQueue を使った非同期処理を行う時に使用します。また、maxConcurrentOperationCount などで同時実行数を制御したい時などに便利なスケジューラです。

参考

11
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
11