LoginSignup
9
3

More than 1 year has passed since last update.

RxBlocking Deep Dive

Last updated at Posted at 2018-10-04

「RxBlockingってどうやってスレッドをブロックしてるんだろう?:thinking:

最近RxTestとRxBlockingについての記事を書いたのですが、記事を書くためにRxBlockingについて調べていく中でそんな疑問を持ちました。
RxBlocking自体はソースコードも少なくコンパクトなライブラリなのですが、その内部で使われている仕組みが意外と奥が深く面白かったので紹介したいと思います。

2018/11/05 追記
Otemachi.swift #02というイベントで話したときのスライドを載せておきます。
5分で本テーマを話さなければならなかったため、本記事よりも説明を簡潔にしたり図を入れたりしているので、こちらの方が内容をつかみやすいかもしれません。

RxBlockingとは?

RxBlockingを使ったテストコードは、例えば以下のように書きます。

// 非同期にイベントが発行されるObservable
let asyncObservable = Observable.of(10, 20, 30)
    .observeOn(SerialDispatchQueueScheduler(qos: .background))

let elements = try! asyncObservable.toBlocking().toArray()
expect(elements).to(equal([10, 20, 30]))

asyncObservableは非同期にイベントが発行されるObservableです。
非同期なので普通にサブスクライブして受信したイベントを検証するというテストは書けません。
そんなときに便利なのがRxBlockingです。

RxBlockingをインポートしているファイル内では、ObservableにtoBlocking()というメソッドが生えています。
toBlocking()はObservableをBlockingObservableに変換するメソッドです。
BlockingObservabletoArray()メソッドは、カレントスレッドをブロックし、asyncObservableをサブスクライブし、全てのイベントを受信したらその全ての要素を返します。

RxBlockingを使えば、非同期にイベントが発行されるObservableのテストが簡単に書けます。

こちらの記事も読んでみてください。
フィードバックお待ちしています。
RxTest、RxBlockingによるテストパターン

スレッドをブロックする仕組み

では、RxBlockingはどのようにしてスレッドをブロックしているのでしょうか?

materializeResult()

toArray()メソッドは、BlockingObservableのプライベートな関数であるmaterializeResult()メソッドを内部で呼びます。
フルのソースを載せると長いので、ここでは説明に必要な部分を抜粋します。
ソースはこちら

fileprivate func materializeResult(max: Int? = nil, predicate: @escaping (E) throws -> Bool = { _ in true }) -> MaterializedSequenceResult<E> {
    var elements: [E] = Array<E>()
    var error: Swift.Error?
        
    let lock = RunLoopLock(timeout: timeout)
    ...

elementsはソースとなるストリーム(先程の例でいうところのasyncObservable)から受信したイベントの要素を格納するための変数です。
errorはストリームがエラーイベントを発行した場合にそのエラーを格納するための変数です。

スレッドをブロックする仕組みの肝はRunLoopLockクラスです。
RunLoopLockについては後述しますので、ここでは単にスレッドをブロックする機能を持つクラスくらいに思っていてください。

materializeResult()の先に進みます。

    ...
    lock.dispatch {
        let subscription = self.source.subscribe { event in
            if d.isDisposed {
                return
            }
            switch event {
            case .next(let element):
                do {
                    if try predicate(element) {
                        elements.append(element)
                    }
                    if let max = max, elements.count >= max {
                        d.dispose()
                        lock.stop()
                    }
                } catch (let err) {
                    error = err
                    d.dispose()
                    lock.stop()
                }
            case .error(let err):
                error = err
                d.dispose()
                lock.stop()
            case .completed:
                d.dispose()
                lock.stop()
            }
        }

        d.setDisposable(subscription)
    }
    ...

lock.dispatch()は引数として渡されたクロージャを、言葉として適切ではないかもしれませんが、ブロックしたスレッド内で実行するメソッドです。
クロージャのコードをおおまかに説明すると、ソースとなるストリームをサブスクライブし、nextイベントを受信したらelementsに追加し、completedイベントを受信したらlock.stop()を呼んでスレッドのブロックを解除するということをやっています。
なお、errorイベントを受信した場合はすぐにブロックを解除します。

materializeResult()の最後の部分です。

    ...
    do {
        try lock.run()
    } catch (let err) {
        error = err
    }
    ...

lock.run()はスレッドをブロックするメソッドです。
詳細は後述しますが、このメソッドを呼ぶことで実行ループに入ります。

RunLoopLock

スレッドをブロックする機能の肝となるのは、RxBlocking内で定義されているRunLoopLockクラスです。
ソースはこちら

RunLoopLockクラスは、実行ループを制御するCore FoundationフレームワークのCFRunLoopクラスおよびCFRunLoop関連関数群のラッパークラスです。
実行ループとは、スレッド上で非同期に届くイベントを管理するための仕組みです。
名前の通りループ状態に入り、例えばユーザの画面操作等のイベントを受信すると、対応するハンドラにイベント渡して処理をするということを繰り返します。

実行ループはスレッドごとに1つ実行することができますが、iOSアプリケーションは起動シーケンスの中でメインスレッドで実行ループを実行済みなので、メインスレッドの実行ループを使用する場合は明示的に実行する必要はありません。
RunLoopLockクラスはカレントスレッドの実行ループを利用するようになっているので、これをメインスレッドで呼び出せばメインスレッドの実行ループを制御できるようになります。

それではRunLoopLockのコードを読んでいきましょう。
まずはイニシャライザです。

init(timeout: RxTimeInterval?) {
    _timeout = timeout
    _currentRunLoop = CFRunLoopGetCurrent()
}

_timeoutはスレッドをブロックする時間のタイムアウト値です。
タイムアウト時間を越えるとブロックが解除されます。

_currentRunLoopにはカレントスレッドのCFRunLoopオブジェクトが格納されます。
ここではメインスレッドのCFRunLoopオブジェクトを想定することとします。

続いて、materializeResult()メソッド内で呼ばれていたdispatch()メソッドです。
materializeResult()では、ストリームをサブスクライブして全てのイベントを受信したら実行ループを抜けるという処理のクロージャを渡していました。

func dispatch(_ action: @escaping () -> ()) {
    CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw) {
        if CurrentThreadScheduler.isScheduleRequired {
            _ = CurrentThreadScheduler.instance.schedule(()) { _ in
                action()
                return Disposables.create()
            }
        }
        else {
            action()
        }
    }
    CFRunLoopWakeUp(_currentRunLoop)
}

CFRunLoopPerformBlock()は指定した実行ループにコードブロックを渡して実行させるメソッドです。
ここではメインスレッドの実行ループ内でaction()クロージャを実行するようにしています。

CFRunLoopWakeUp()CFRunLoopPerformBlock()で登録したコードブロックをすぐに実行するために必要なようです。

func stop() {
    if AtomicIncrement(&_calledStop) != 1 {
        return
    }
    CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw) {
        CFRunLoopStop(self._currentRunLoop)
    }
    CFRunLoopWakeUp(_currentRunLoop)
}

stop()メソッドは、実行ループを終了させるCFRunLoopStop()メソッドを呼び出します。
これにより、スレッドのブロックが解除されます。

そして最後、run()メソッドです。

func run() throws {
    if AtomicIncrement(&_calledRun) != 1 {
        fatalError("Run can be only called once")
    }
    if let timeout = _timeout {
        #if os(Linux)
            switch Int(CFRunLoopRunInMode(runLoopModeRaw, timeout, false)) {
            case kCFRunLoopRunFinished:
                return
            case kCFRunLoopRunHandledSource:
                return
            case kCFRunLoopRunStopped:
                return
            case kCFRunLoopRunTimedOut:
                throw RxError.timeout
            default:
                fatalError("This failed because `CFRunLoopRunResult` wasn't bridged to Swift.")
            }
        #else
            switch CFRunLoopRunInMode(runLoopMode, timeout, false) {
            case .finished:
                return
            case .handledSource:
                return
            case .stopped:
                return
            case .timedOut:
                throw RxError.timeout
            }
        #endif
    }
    else {
        CFRunLoopRun()
    }
}

CFRunLoopRun()メソッドは、カレントスレッドの実行ループを実行するメソッドです。
CFRunLoopRun()メソッドはカレントスレッドの実行ループを再帰的に実行することが可能で、CFRunLoopRun()メソッドの呼び出しはカレントスレッドのコールスタックに追加されます。

callstack.png

CFRunLoopRun()メソッドはCFRunLoopStop()メソッドを呼ばない限りループし続け、呼び出し元に戻らないので、ここでスレッドがブロックされることになります(タイムアウトを指定している場合はCFRunLoopRunInMode()が呼ばれ、タイムアウトしたら強制的にブロックが解除されます)。

ではどこでCFRunLoopStop()メソッドが呼ばれるかと言うと、materializeResult()のこの部分です。

再掲
    ...
    lock.dispatch {
        let subscription = self.source.subscribe { event in
            if d.isDisposed {
                return
            }
            switch event {
            case .next(let element):
                do {
                    if try predicate(element) {
                        elements.append(element)
                    }
                    if let max = max, elements.count >= max {
                        d.dispose()
                        lock.stop()
                    }
                } catch (let err) {
                    error = err
                    d.dispose()
                    lock.stop()
                }
            case .error(let err):
                error = err
                d.dispose()
                lock.stop()
            case .completed:
                d.dispose()
                lock.stop()
            }
        }

        d.setDisposable(subscription)
    }
    ...

全てのイベントが到着し、最後にcompletedイベントが発行されたら、RunLoopLockクラスのstop()メソッドが呼ばれます。
stop()メソッドはCFRunLoopStop()を呼ぶので、ここで実行ループが終了し、呼び出し元に戻ります。

呼び出し元はmaterializeResult()メソッドのこの部分でした。

    do {
        try lock.run()
    } catch (let err) {
        error = err
    }

    if let error = error {
        return MaterializedSequenceResult.failed(elements: elements, error: error)
    }

    return MaterializedSequenceResult.completed(elements: elements)
}

materializeResult()メソッドは最後に取得した要素を関連値として持つEnumを返して終了します。
冒頭に出てきたBlockingObservabletoArray()メソッドは、このEnumから値を取り出して返すメソッドです。

RxBlockingがスレッドをブロックする仕組みの説明は以上となります。

まとめ

RxBlockingでは、CFRunLoopというかなり低レイヤな仕組みを使ってスレッドのブロックをしていることがわかりました。
そして、ブロッキングの仕組みの正体は、メインスレッドで再帰的に実行された実行ループであることがわかりました。

ブロッキングの仕組みがわかったところで何か役に立つわけではないですが、自分としてはiOSの仕組みの一部を知ることができてよかったと思っています。

参考資料

公式

ブログ

9
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
3