RxSwiftのソースコードを読んでみて
隙間時間を見つけてRxSwiftを勉強し始めて数か月が経つのですが、簡単な実装でも出てくる用語とかオペレータが多く短時間だとなかなか理解が深まりませんでした。
RxSwiftのPlaygroundを触ったり、Exampleを触ったり、マーブルダイアグラムを見たりして、ようやくある程度理解できるようになってきました。
オペレータについて追ってみたのでまとめてみました。もし間違いなどがあればご指摘いただきたいです。
RxSwiftのドキュメントについて
RxSwiftのリポジトリにはDocumentationが用意されており、いろいろなドキュメントが用意されています。
HotAndColdObservables.mdなんかはHotとColdの違いについて表も用いて説明されているので一度見てみるといいと思います。
RxSwiftで実装されているオペレータについてはAPI.mdに詳しく書かれています。
API.mdに書かれていること
導入部分でさらっと説明が書かれていますが、まとめてみるとこんな感じです。
- いろいろなプラットフォームでいろいろな実装がされており、同じオペレーターでも異なる呼び方をしている
- これは歴史的な理由や言語の予約語が原因だったりする
- オペレーターは状態を持たない
- 目的に応じたAPIのリファレンスが書かれている
ここではオペレータについて記載されています。リンク先はReactiveXのページになっており、マーブルダイアグラムを見ることができます。リンク先のページ下部には言語ごとの実装について記載されていますが、RxSwiftは"TBD"と書かれており、サンプルなどは書かれていません。
API.mdのCreating ObservablesのasObservable
のリンク先を見てみると、asObservable
はFromと言うことが分かったりします。
また、ここに書かれている基本的な関数については、ある程度RxSwiftに含まれているPlaygroundで動作を確認することができます。
今回はempty
のソースを追って、Observable
、ObserverType
、AnonymousObserver
などに触れてみました。
empty
のソースを追ってみる
empty
はReactiveXのリンクを見ても"Empty"と紹介されています。
create an Observable that emits no items but terminates normally
何もアイテムを発行しないが、正常に終了するObservableを作成します。
Playgroundでは、こんな感じでサンプルが書かれていました。
let emptySequence = Observable<Int>.empty()
emptySequence
.subscribe { event in
print(event) // Completed
}
empty
で作成したObservableに対してsubscribe
を呼び出しています(subscribe
は後ほど)。
Event<Element>
クラス
empty
は何もアイテムを発行しないのでCompleted
と出力されるのですが、これの正体はEvent
というenumになります。
public enum Event<Element> : CustomDebugStringConvertible {
/**
Next element is produced
*/
case Next(Element)
/**
Sequence terminates with error
*/
case Error(ErrorType)
/**
Sequence completes sucessfully
*/
case Completed
}
Event
は3つのメンバーを持っており、Next(Element)
、Error(ErrorType)
、そしてCompleted
となっています。empty
は何もアイテムを発行せずに終了と書かれていたので、Completed
が完了を意味することが分かります。
Observable<Element>
クラス
empty
の話に戻ります。empty
の実装は以下のようになっています。empty
は引数を何も取らずに型E
のObservable
を返すことが分かります。
public static func empty() -> Observable<E> {
return Empty<E>()
}
empty
はObservable
のextensionです。Observable+Creation.swiftというファイルで定義されており、このファイルにはcreate
,just
といった関数も定義されており、ファイル名通りObservable
を作成するための便利メソッドが定義されています。
Observable
を見てみましょう(実際はコメントがありますが長くなるので削除しています)。
public class Observable<Element> : ObservableType {
public typealias E = Element
init() {
#if TRACE_RESOURCES
OSAtomicIncrement32(&resourceCount)
#endif
}
public func subscribe<O: ObserverType where O.E == E>(observer: O) -> Disposable {
abstractMethod()
}
public func asObservable() -> Observable<E> {
return self
}
deinit {
#if TRACE_RESOURCES
AtomicDecrement(&resourceCount)
#endif
}
internal func composeMap<R>(selector: Element throws -> R) -> Observable<R> {
return Map(source: self, selector: selector)
}
}
Observable
はジェネリックなクラスになっており、外部公開メソッドとしてsubscribe
とasObservable
を実装しています。
subscribe
はabstractMethod()
を実行していますが、@noreturn
な関数で内部的にfatalError
を呼び出しています。継承して使うものということでしょうか。
asObservable
はself
を返しているだけです。
ObservableType
プロトコル
また、Observable
クラスはObservableType
プロトコルに適合していることが分かります。ObservableType
プロトコルは以下のようになっています(実際はコメントがありますが長くなるので削除しています)。
public protocol ObservableType : ObservableConvertibleType {
associatedtype E
@warn_unused_result(message="http://git.io/rxs.ud")
func subscribe<O: ObserverType where O.E == E>(observer: O) -> Disposable
}
関連型E
を持ち、subscribe
メソッドを実装することが、ObservableType
プロトコルには必要であることが分かります。
subscribe
できることがObservable
=観察可能である、という性質を表しているということだと思います。
Disposable
プロトコル
subscribe
はObserverType
を引数に取り、Disposable
を返します。この時ObserverType
の関連型とObservableType
の関連型は同じである必要があります。
Disposable
はプロトコルです。
/**
Respresents disposable resource.
*/
public protocol Disposable {
/**
Dispose resource.
*/
func dispose()
}
dispose
を実装することでリソースの処分を行うみたいです。
ObserverType
プロトコル
ObserverType
プロトコルも見てみます。ObserverType
も関連型E
を持ち、on
メソッドの実装が求められます。Event
が再登場しました。
public protocol ObserverType {
/**
The type of elements in sequence that observer can observe.
*/
associatedtype E
/**
Notify observer about sequence event.
- parameter event: Event that occured.
*/
func on(event: Event<E>)
}
on
メソッドのはオブザーバー=観察者に対してイベントを通知するためのメソッドのようです。
ObserverType
にはEvent
各種に対する便利メソッドのデフォルト実装が用意されていました。
public extension ObserverType {
/**
Convenience method equivalent to `on(.Next(element: E))`
- parameter element: Next element to send to observer(s)
*/
final func onNext(element: E) {
on(.Next(element))
}
/**
Convenience method equivalent to `on(.Completed)`
*/
final func onCompleted() {
on(.Completed)
}
/**
Convenience method equivalent to `on(.Error(error: ErrorType))`
- parameter error: ErrorType to send to observer(s)
*/
final func onError(error: ErrorType) {
on(.Error(error))
}
}
Empty
クラス
さて、Observable
がなんとなくわかったので、empty
を再度見てみましょう(Observable
を見るだけだったのですがだいぶ遠回りしました)。
empty
メソッドはObservable<E>
を戻り値として返しています。
public static func empty() -> Observable<E> {
return Empty<E>()
}
Empty
のインスタンスを返していますが、Empty
はObservable
のサブクラスなのでしょう。
このEmpty
は以下のように定義されていました。Empty
クラスはProducer
クラスを継承したもので、subscribe
メソッドをオーバーライドしています(Producer
はObservable
のサブクラスです)。
class Empty<Element> : Producer<Element> {
override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
observer.on(.Completed)
return NopDisposable.instance
}
}
先ほど出てきたObserverType
のon
にEvent
の.Completed
を渡していますね。Event
には.Next
も.Error
もありますが、どちらも渡していません。
Empty
はpublic
になっていないのでRxSwiftをモジュールとして使う際には使えません。
NopDisposable
クラス
NopDisposable
は何もオペレーションの必要がないDisposable
でシングルトンになっていました。何もしないDisposable
みたいです。
public struct NopDisposable : Disposable {
/**
Singleton instance of `NopDisposable`.
*/
public static let instance: Disposable = NopDisposable()
init() {
}
/**
Does nothing.
*/
public func dispose() {
}
}
ここまでのまとめ
ここまでをまとめると、観察可能なObservableType
に適合したObservable<E>
を継承したEmpty
のインスタンスをempty
メソッドの戻り値とすることでイベントの監視を可能にしています。
public static func empty() -> Observable<E> {
return Empty<E>()
}
subscribe(on: (event: Event<E>) -> Void) -> DisposableはObservable<Element>
メソッド
ここでようやくPlaygroundのサンプルに戻ります。
let emptySequence = Observable<Int>.empty()
emptySequence
.subscribe { event in
print(event) // Completed
}
Observable<Int>.empty()
の流れはここまで書いてきたとおりです。Observable<Int>
は観察可能なクラスで、empty
クラスメソッドによって.Completed
のEvent
のみのシーケンスが生成されます。それがemptySequence
です。
Empty
はsubscribe
を実装しているので、subscribe
を呼び出すことができます。
と思いきや、Playgroundで使用されているsubscribe
はEmpty
クラスに定義されているsubscribe
とは引数が異なります。
Empty
はこのように実装されていました。ObserverType
を引数として受け取ってDisposable
を戻り値としていました。
class Empty<Element> : Producer<Element> {
override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
observer.on(.Completed)
return NopDisposable.instance
}
}
しかしPlaygroundで使われていたのは以下のsubscribe
です。Event
を引数にVoid
を返す関数を第一引数とし、戻り値はDisposable
型です。
@warn_unused_result(message="http://git.io/rxs.ud")
public func subscribe(on: (event: Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(event: e)
}
return self.subscribeSafe(observer)
}
AnonymousObserver
クラス
引数として渡されたon
はAnonymousObserver
に渡されています。このクラスはObserverBase<ElementType>
のサブクラスとなっており、ObserverBase
はObserverType
とDisposable
に適合しています。
class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = Event<Element> -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: EventHandler) {
#if TRACE_RESOURCES
AtomicIncrement(&resourceCount)
#endif
_eventHandler = eventHandler
}
override func onCore(event: Event<Element>) {
return _eventHandler(event)
}
#if TRACE_RESOURCES
deinit {
AtomicDecrement(&resourceCount)
}
#endif
}
AnonymousObserver
はEvent<Element> -> Void
型の値をプロパティとして保持しており、オーバーライドしているonCore
が、スーパークラスであるObserverBase<ElementType>
のon
で呼び出されています。
ObserverBase<ElementType>
のonCore
はオーバーライドされることが前提でabstractMethod()
が呼ばれています(いまのところAnonymousObserver
以外にObserverBase<ElementType>
を継承しているクラスはありませんでした)。
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private var _isStopped: AtomicInt = 0
func on(event: Event<E>) {
switch event {
case .Next:
if _isStopped == 0 {
onCore(event)
}
case .Error, .Completed:
if !AtomicCompareAndSwap(0, 1, &_isStopped) {
return
}
onCore(event)
}
}
func onCore(event: Event<E>) {
abstractMethod()
}
func dispose() {
_isStopped = 1
}
}
イベントを受け取ってObserverType
のon
を実行するのがAnonymousObserver
の役割っぽいです。
@warn_unused_result(message="http://git.io/rxs.ud")
public func subscribe(on: (event: Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(event: e)
}
return self.subscribeSafe(observer)
}
subscribeSafe
メソッド
AnonymousObserver
のインスタンスはsubscribeSafe
メソッドの引数として渡されています。
public extension ObservableType {
/**
All internal subscribe calls go through this method
*/
@warn_unused_result(message="http://git.io/rxs.ud")
func subscribeSafe<O: ObserverType where O.E == E>(observer: O) -> Disposable {
return self.asObservable().subscribe(observer)
}
}
All internal subscribe calls go through this method
インターナルなsubscribeはすべてこのメソッドを通して呼ばれるそうで、Empty
で定義していた以下のsubscribe
はここで呼び出されているようですね(この"All internal subscribe"がどこまでをカバーしているのかは確認していません)。
class Empty<Element> : Producer<Element> {
override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
observer.on(.Completed)
return NopDisposable.instance
}
}
コードリーディングまとめ
以下のコードを軽くまとめます。
let emptySequence = Observable<Int>.empty()
emptySequence
.subscribe { event in
print(event) // Completed
}
整理すると、
-
Observable<Int>.empty()
で空のシーケンスが作成される
-
Empty<E>
のインスタンスが生成されている -
Empty<E>
はObservable<Element>
のサブクラス -
Observable<Element>
=監察可能(subscribe
を実装しているので)
-
subscribe(on: (event: Event<E>) -> Void) -> Disposable
で流れてくるイベントを処理する
-
subscribe(on: (event: Event<E>) -> Void) -> Disposable
はObservable<Element>
のextensionで定義されている - 引数として渡しているクロージャは
AnonymousObserver
がプロパティとして保持 -
AnonymousObserver
インスタンスがsubscribeSafe
メソッドの引数に渡される -
subscribeSafe
メソッド内ではEmpty
のsubscribe
メソッドが実行される -
Empty
のsubscribe
は渡されたobserver
のon
を実行し、引数にCompleted
を渡している
以上のようになるかと思います。
まとめ
出てくる用語を一つずつ整理すると今回のようにたくさん寄り道をしてソースを読まなければならないので、API.mdやマーブルダイアグラムに目を通してRxの用語を理解しておくともっとスムーズになるかもしれません。
コード的にはclassによる継承が結構使われている印象を受けました。
実装を見るとどのように動くのか分かって良いですね。