「RxBlockingってどうやってスレッドをブロックしてるんだろう?」
最近RxTestとRxBlockingについての記事を書いたのですが、記事を書くためにRxBlockingについて調べていく中でそんな疑問を持ちました。
RxBlocking自体はソースコードも少なくコンパクトなライブラリなのですが、その内部で使われている仕組みが意外と奥が深く面白かったので紹介したいと思います。
2018/11/05 追記
Otemachi.swift #02というイベントで話したときのスライドを載せておきます。
5分で本テーマを話さなければならなかったため、本記事よりも説明を簡潔にしたり図を入れたりしているので、こちらの方が内容をつかみやすいかもしれません。
RxBlockingとは?
RxBlockingを使ったテストコードは、例えば以下のように書きます。
// 非同期にイベントが発行されるObservable
let asyncObservable = Observable.of(10, 20, 30)
.observeOn(SerialDispatchQueueScheduler(qos: .background))
let elements = try! asyncObservable.toBlocking().toArray()
expect(elements).to(equal([10, 20, 30]))
asyncObservable
は非同期にイベントが発行されるObservableです。
非同期なので普通にサブスクライブして受信したイベントを検証するというテストは書けません。
そんなときに便利なのがRxBlockingです。
RxBlockingをインポートしているファイル内では、ObservableにtoBlocking()
というメソッドが生えています。
toBlocking()
はObservableをBlockingObservable
に変換するメソッドです。
BlockingObservable
のtoArray()
メソッドは、カレントスレッドをブロックし、asyncObservable
をサブスクライブし、全てのイベントを受信したらその全ての要素を返します。
RxBlockingを使えば、非同期にイベントが発行されるObservableのテストが簡単に書けます。
こちらの記事も読んでみてください。
フィードバックお待ちしています。
RxTest、RxBlockingによるテストパターン
スレッドをブロックする仕組み
では、RxBlockingはどのようにしてスレッドをブロックしているのでしょうか?
materializeResult()
toArray()
メソッドは、BlockingObservable
のプライベートな関数であるmaterializeResult()
メソッドを内部で呼びます。
フルのソースを載せると長いので、ここでは説明に必要な部分を抜粋します。
ソースはこちら
fileprivate func materializeResult(max: Int? = nil, predicate: @escaping (E) throws -> Bool = { _ in true }) -> MaterializedSequenceResult<E> {
var elements: [E] = Array<E>()
var error: Swift.Error?
let lock = RunLoopLock(timeout: timeout)
...
elements
はソースとなるストリーム(先程の例でいうところのasyncObservable
)から受信したイベントの要素を格納するための変数です。
error
はストリームがエラーイベントを発行した場合にそのエラーを格納するための変数です。
スレッドをブロックする仕組みの肝はRunLoopLock
クラスです。
RunLoopLock
については後述しますので、ここでは単にスレッドをブロックする機能を持つクラスくらいに思っていてください。
materializeResult()
の先に進みます。
...
lock.dispatch {
let subscription = self.source.subscribe { event in
if d.isDisposed {
return
}
switch event {
case .next(let element):
do {
if try predicate(element) {
elements.append(element)
}
if let max = max, elements.count >= max {
d.dispose()
lock.stop()
}
} catch (let err) {
error = err
d.dispose()
lock.stop()
}
case .error(let err):
error = err
d.dispose()
lock.stop()
case .completed:
d.dispose()
lock.stop()
}
}
d.setDisposable(subscription)
}
...
lock.dispatch()
は引数として渡されたクロージャを、言葉として適切ではないかもしれませんが、ブロックしたスレッド内で実行するメソッドです。
クロージャのコードをおおまかに説明すると、ソースとなるストリームをサブスクライブし、next
イベントを受信したらelements
に追加し、completed
イベントを受信したらlock.stop()
を呼んでスレッドのブロックを解除するということをやっています。
なお、error
イベントを受信した場合はすぐにブロックを解除します。
materializeResult()
の最後の部分です。
...
do {
try lock.run()
} catch (let err) {
error = err
}
...
lock.run()
はスレッドをブロックするメソッドです。
詳細は後述しますが、このメソッドを呼ぶことで実行ループに入ります。
RunLoopLock
スレッドをブロックする機能の肝となるのは、RxBlocking内で定義されているRunLoopLock
クラスです。
ソースはこちら
RunLoopLock
クラスは、実行ループを制御するCore FoundationフレームワークのCFRunLoop
クラスおよびCFRunLoop
関連関数群のラッパークラスです。
実行ループとは、スレッド上で非同期に届くイベントを管理するための仕組みです。
名前の通りループ状態に入り、例えばユーザの画面操作等のイベントを受信すると、対応するハンドラにイベント渡して処理をするということを繰り返します。
実行ループはスレッドごとに1つ実行することができますが、iOSアプリケーションは起動シーケンスの中でメインスレッドで実行ループを実行済みなので、メインスレッドの実行ループを使用する場合は明示的に実行する必要はありません。
RunLoopLock
クラスはカレントスレッドの実行ループを利用するようになっているので、これをメインスレッドで呼び出せばメインスレッドの実行ループを制御できるようになります。
それではRunLoopLock
のコードを読んでいきましょう。
まずはイニシャライザです。
init(timeout: RxTimeInterval?) {
_timeout = timeout
_currentRunLoop = CFRunLoopGetCurrent()
}
_timeout
はスレッドをブロックする時間のタイムアウト値です。
タイムアウト時間を越えるとブロックが解除されます。
_currentRunLoop
にはカレントスレッドのCFRunLoop
オブジェクトが格納されます。
ここではメインスレッドのCFRunLoop
オブジェクトを想定することとします。
続いて、materializeResult()
メソッド内で呼ばれていたdispatch()
メソッドです。
materializeResult()
では、ストリームをサブスクライブして全てのイベントを受信したら実行ループを抜けるという処理のクロージャを渡していました。
func dispatch(_ action: @escaping () -> ()) {
CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw) {
if CurrentThreadScheduler.isScheduleRequired {
_ = CurrentThreadScheduler.instance.schedule(()) { _ in
action()
return Disposables.create()
}
}
else {
action()
}
}
CFRunLoopWakeUp(_currentRunLoop)
}
CFRunLoopPerformBlock()
は指定した実行ループにコードブロックを渡して実行させるメソッドです。
ここではメインスレッドの実行ループ内でaction()
クロージャを実行するようにしています。
CFRunLoopWakeUp()
はCFRunLoopPerformBlock()
で登録したコードブロックをすぐに実行するために必要なようです。
func stop() {
if AtomicIncrement(&_calledStop) != 1 {
return
}
CFRunLoopPerformBlock(_currentRunLoop, runLoopModeRaw) {
CFRunLoopStop(self._currentRunLoop)
}
CFRunLoopWakeUp(_currentRunLoop)
}
stop()
メソッドは、実行ループを終了させるCFRunLoopStop()
メソッドを呼び出します。
これにより、スレッドのブロックが解除されます。
そして最後、run()
メソッドです。
func run() throws {
if AtomicIncrement(&_calledRun) != 1 {
fatalError("Run can be only called once")
}
if let timeout = _timeout {
#if os(Linux)
switch Int(CFRunLoopRunInMode(runLoopModeRaw, timeout, false)) {
case kCFRunLoopRunFinished:
return
case kCFRunLoopRunHandledSource:
return
case kCFRunLoopRunStopped:
return
case kCFRunLoopRunTimedOut:
throw RxError.timeout
default:
fatalError("This failed because `CFRunLoopRunResult` wasn't bridged to Swift.")
}
#else
switch CFRunLoopRunInMode(runLoopMode, timeout, false) {
case .finished:
return
case .handledSource:
return
case .stopped:
return
case .timedOut:
throw RxError.timeout
}
#endif
}
else {
CFRunLoopRun()
}
}
CFRunLoopRun()
メソッドは、カレントスレッドの実行ループを実行するメソッドです。
CFRunLoopRun()
メソッドはカレントスレッドの実行ループを再帰的に実行することが可能で、CFRunLoopRun()
メソッドの呼び出しはカレントスレッドのコールスタックに追加されます。
CFRunLoopRun()
メソッドはCFRunLoopStop()
メソッドを呼ばない限りループし続け、呼び出し元に戻らないので、ここでスレッドがブロックされることになります(タイムアウトを指定している場合はCFRunLoopRunInMode()
が呼ばれ、タイムアウトしたら強制的にブロックが解除されます)。
ではどこでCFRunLoopStop()
メソッドが呼ばれるかと言うと、materializeResult()
のこの部分です。
...
lock.dispatch {
let subscription = self.source.subscribe { event in
if d.isDisposed {
return
}
switch event {
case .next(let element):
do {
if try predicate(element) {
elements.append(element)
}
if let max = max, elements.count >= max {
d.dispose()
lock.stop()
}
} catch (let err) {
error = err
d.dispose()
lock.stop()
}
case .error(let err):
error = err
d.dispose()
lock.stop()
case .completed:
d.dispose()
lock.stop()
}
}
d.setDisposable(subscription)
}
...
全てのイベントが到着し、最後にcompleted
イベントが発行されたら、RunLoopLock
クラスのstop()
メソッドが呼ばれます。
stop()
メソッドはCFRunLoopStop()
を呼ぶので、ここで実行ループが終了し、呼び出し元に戻ります。
呼び出し元はmaterializeResult()
メソッドのこの部分でした。
do {
try lock.run()
} catch (let err) {
error = err
}
if let error = error {
return MaterializedSequenceResult.failed(elements: elements, error: error)
}
return MaterializedSequenceResult.completed(elements: elements)
}
materializeResult()
メソッドは最後に取得した要素を関連値として持つEnumを返して終了します。
冒頭に出てきたBlockingObservable
のtoArray()
メソッドは、このEnumから値を取り出して返すメソッドです。
RxBlockingがスレッドをブロックする仕組みの説明は以上となります。
まとめ
RxBlockingでは、CFRunLoop
というかなり低レイヤな仕組みを使ってスレッドのブロックをしていることがわかりました。
そして、ブロッキングの仕組みの正体は、メインスレッドで再帰的に実行された実行ループであることがわかりました。
ブロッキングの仕組みがわかったところで何か役に立つわけではないですが、自分としてはiOSの仕組みの一部を知ることができてよかったと思っています。
参考資料
公式
ブログ