RxSwiftのソースコードを読んでみて
隙間時間を見つけてRxSwiftを勉強し始めて数か月が経つのですが、簡単な実装でも出てくる用語とかオペレータが多く短時間だとなかなか理解が深まりませんでした。
RxSwiftのPlaygroundを触ったり、Exampleを触ったり、マーブルダイアグラムを見たりして、ようやくある程度理解できるようになってきました。
オペレータについて追ってみたのでまとめてみました。もし間違いなどがあればご指摘いただきたいです。
RxSwiftのドキュメントについて
RxSwiftのリポジトリにはDocumentationが用意されており、いろいろなドキュメントが用意されています。
HotAndColdObservables.mdなんかはHotとColdの違いについて表も用いて説明されているので一度見てみるといいと思います。
RxSwiftで実装されているオペレータについてはAPI.mdに詳しく書かれています。
API.mdに書かれていること
導入部分でさらっと説明が書かれていますが、まとめてみるとこんな感じです。
- いろいろなプラットフォームでいろいろな実装がされており、同じオペレーターでも異なる呼び方をしている
 - これは歴史的な理由や言語の予約語が原因だったりする
 - オペレーターは状態を持たない
 - 目的に応じたAPIのリファレンスが書かれている
 
ここではオペレータについて記載されています。リンク先はReactiveXのページになっており、マーブルダイアグラムを見ることができます。リンク先のページ下部には言語ごとの実装について記載されていますが、RxSwiftは"TBD"と書かれており、サンプルなどは書かれていません。
API.mdのCreating ObservablesのasObservableのリンク先を見てみると、asObservableはFromと言うことが分かったりします。
また、ここに書かれている基本的な関数については、ある程度RxSwiftに含まれているPlaygroundで動作を確認することができます。
今回はemptyのソースを追って、Observable、ObserverType、AnonymousObserver などに触れてみました。
emptyのソースを追ってみる
emptyはReactiveXのリンクを見ても"Empty"と紹介されています。
create an Observable that emits no items but terminates normally
何もアイテムを発行しないが、正常に終了するObservableを作成します。
Playgroundでは、こんな感じでサンプルが書かれていました。
let emptySequence = Observable<Int>.empty()
emptySequence
    .subscribe { event in
        print(event) // Completed
}
emptyで作成したObservableに対してsubscribeを呼び出しています(subscribeは後ほど)。
Event<Element>クラス
emptyは何もアイテムを発行しないのでCompletedと出力されるのですが、これの正体はEventというenumになります。
public enum Event<Element> : CustomDebugStringConvertible {
    /**
    Next element is produced
    */
    case Next(Element)
    /**
    Sequence terminates with error
    */
    case Error(ErrorType)
    /**
    Sequence completes sucessfully
    */
    case Completed
}
Eventは3つのメンバーを持っており、Next(Element)、Error(ErrorType)、そしてCompletedとなっています。emptyは何もアイテムを発行せずに終了と書かれていたので、Completedが完了を意味することが分かります。
Observable<Element>クラス
emptyの話に戻ります。emptyの実装は以下のようになっています。emptyは引数を何も取らずに型EのObservableを返すことが分かります。
public static func empty() -> Observable<E> {
    return Empty<E>()
}
emptyはObservableのextensionです。Observable+Creation.swiftというファイルで定義されており、このファイルにはcreate,justといった関数も定義されており、ファイル名通りObservableを作成するための便利メソッドが定義されています。
Observableを見てみましょう(実際はコメントがありますが長くなるので削除しています)。
public class Observable<Element> : ObservableType {
    public typealias E = Element
    init() {
# if TRACE_RESOURCES
        OSAtomicIncrement32(&resourceCount)
# endif
    }
    public func subscribe<O: ObserverType where O.E == E>(observer: O) -> Disposable {
        abstractMethod()
    }
    public func asObservable() -> Observable<E> {
        return self
    }
    deinit {
# if TRACE_RESOURCES
        AtomicDecrement(&resourceCount)
# endif
    }
    internal func composeMap<R>(selector: Element throws -> R) -> Observable<R> {
        return Map(source: self, selector: selector)
    }
}
Observableはジェネリックなクラスになっており、外部公開メソッドとしてsubscribeとasObservableを実装しています。
subscribeはabstractMethod()を実行していますが、@noreturnな関数で内部的にfatalErrorを呼び出しています。継承して使うものということでしょうか。
asObservableはselfを返しているだけです。
ObservableTypeプロトコル
また、ObservableクラスはObservableTypeプロトコルに適合していることが分かります。ObservableTypeプロトコルは以下のようになっています(実際はコメントがありますが長くなるので削除しています)。
public protocol ObservableType : ObservableConvertibleType {
    associatedtype E
    @warn_unused_result(message="http://git.io/rxs.ud")
    func subscribe<O: ObserverType where O.E == E>(observer: O) -> Disposable
}
関連型Eを持ち、subscribeメソッドを実装することが、ObservableTypeプロトコルには必要であることが分かります。
subscribeできることがObservable=観察可能である、という性質を表しているということだと思います。
Disposableプロトコル
subscribeはObserverTypeを引数に取り、Disposableを返します。この時ObserverTypeの関連型とObservableTypeの関連型は同じである必要があります。
Disposableはプロトコルです。
/**
Respresents disposable resource.
*/
public protocol Disposable {
    /**
    Dispose resource.
    */
    func dispose()
}
disposeを実装することでリソースの処分を行うみたいです。
ObserverTypeプロトコル
ObserverTypeプロトコルも見てみます。ObserverTypeも関連型Eを持ち、onメソッドの実装が求められます。Eventが再登場しました。
public protocol ObserverType {
    /**
    The type of elements in sequence that observer can observe.
    */
    associatedtype E
    /**
    Notify observer about sequence event.
    - parameter event: Event that occured.
    */
    func on(event: Event<E>)
}
onメソッドのはオブザーバー=観察者に対してイベントを通知するためのメソッドのようです。
ObserverTypeにはEvent各種に対する便利メソッドのデフォルト実装が用意されていました。
public extension ObserverType {
    /**
    Convenience method equivalent to `on(.Next(element: E))`
    - parameter element: Next element to send to observer(s)
    */
    final func onNext(element: E) {
        on(.Next(element))
    }
    /**
    Convenience method equivalent to `on(.Completed)`
    */
    final func onCompleted() {
        on(.Completed)
    }
    /**
    Convenience method equivalent to `on(.Error(error: ErrorType))`
    - parameter error: ErrorType to send to observer(s)
    */
    final func onError(error: ErrorType) {
        on(.Error(error))
    }
}
Emptyクラス
さて、Observableがなんとなくわかったので、emptyを再度見てみましょう(Observableを見るだけだったのですがだいぶ遠回りしました)。
emptyメソッドはObservable<E>を戻り値として返しています。
public static func empty() -> Observable<E> {
    return Empty<E>()
}
Emptyのインスタンスを返していますが、EmptyはObservableのサブクラスなのでしょう。
このEmptyは以下のように定義されていました。EmptyクラスはProducerクラスを継承したもので、subscribeメソッドをオーバーライドしています(ProducerはObservableのサブクラスです)。
class Empty<Element> : Producer<Element> {
    override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
        observer.on(.Completed)
        return NopDisposable.instance
    }
}
先ほど出てきたObserverTypeのonにEventの.Completedを渡していますね。Eventには.Nextも.Errorもありますが、どちらも渡していません。
EmptyはpublicになっていないのでRxSwiftをモジュールとして使う際には使えません。
NopDisposableクラス
NopDisposableは何もオペレーションの必要がないDisposableでシングルトンになっていました。何もしないDisposableみたいです。
public struct NopDisposable : Disposable {
 
    /**
    Singleton instance of `NopDisposable`.
    */
    public static let instance: Disposable = NopDisposable()
    
    init() {
        
    }
    
    /**
    Does nothing.
    */
    public func dispose() {
    }
}
ここまでのまとめ
ここまでをまとめると、観察可能なObservableTypeに適合したObservable<E>を継承したEmptyのインスタンスをemptyメソッドの戻り値とすることでイベントの監視を可能にしています。
public static func empty() -> Observable<E> {
    return Empty<E>()
}
subscribe(on: (event: Event<E>) -> Void) -> DisposableはObservable<Element>メソッド
ここでようやくPlaygroundのサンプルに戻ります。
let emptySequence = Observable<Int>.empty()
emptySequence
    .subscribe { event in
        print(event) // Completed
}
Observable<Int>.empty()の流れはここまで書いてきたとおりです。Observable<Int>は観察可能なクラスで、emptyクラスメソッドによって.CompletedのEventのみのシーケンスが生成されます。それがemptySequenceです。
Emptyはsubscribeを実装しているので、subscribeを呼び出すことができます。
と思いきや、Playgroundで使用されているsubscribeはEmptyクラスに定義されているsubscribeとは引数が異なります。
Emptyはこのように実装されていました。ObserverTypeを引数として受け取ってDisposableを戻り値としていました。
class Empty<Element> : Producer<Element> {
    override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
        observer.on(.Completed)
        return NopDisposable.instance
    }
}
しかしPlaygroundで使われていたのは以下のsubscribeです。Eventを引数にVoidを返す関数を第一引数とし、戻り値はDisposable型です。
@warn_unused_result(message="http://git.io/rxs.ud")
public func subscribe(on: (event: Event<E>) -> Void)
    -> Disposable {
    let observer = AnonymousObserver { e in
        on(event: e)
    }
    return self.subscribeSafe(observer)
}
AnonymousObserverクラス
引数として渡されたonはAnonymousObserverに渡されています。このクラスはObserverBase<ElementType>のサブクラスとなっており、ObserverBaseはObserverTypeとDisposableに適合しています。
class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
    typealias Element = ElementType
    
    typealias EventHandler = Event<Element> -> Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: EventHandler) {
# if TRACE_RESOURCES
        AtomicIncrement(&resourceCount)
# endif
        _eventHandler = eventHandler
    }
    override func onCore(event: Event<Element>) {
        return _eventHandler(event)
    }
    
# if TRACE_RESOURCES
    deinit {
        AtomicDecrement(&resourceCount)
    }
# endif
}
AnonymousObserverはEvent<Element> -> Void型の値をプロパティとして保持しており、オーバーライドしているonCoreが、スーパークラスであるObserverBase<ElementType>のonで呼び出されています。
ObserverBase<ElementType>のonCoreはオーバーライドされることが前提でabstractMethod()が呼ばれています(いまのところAnonymousObserver以外にObserverBase<ElementType>を継承しているクラスはありませんでした)。
class ObserverBase<ElementType> : Disposable, ObserverType {
    typealias E = ElementType
    private var _isStopped: AtomicInt = 0
    func on(event: Event<E>) {
        switch event {
        case .Next:
            if _isStopped == 0 {
                onCore(event)
            }
        case .Error, .Completed:
            if !AtomicCompareAndSwap(0, 1, &_isStopped) {
                return
            }
            onCore(event)
        }
    }
    func onCore(event: Event<E>) {
        abstractMethod()
    }
    func dispose() {
        _isStopped = 1
    }
}
イベントを受け取ってObserverTypeのonを実行するのがAnonymousObserverの役割っぽいです。
@warn_unused_result(message="http://git.io/rxs.ud")
public func subscribe(on: (event: Event<E>) -> Void)
    -> Disposable {
    let observer = AnonymousObserver { e in
        on(event: e)
    }
    return self.subscribeSafe(observer)
}
subscribeSafeメソッド
AnonymousObserverのインスタンスはsubscribeSafeメソッドの引数として渡されています。
public extension ObservableType {
    /**
    All internal subscribe calls go through this method
    */
    @warn_unused_result(message="http://git.io/rxs.ud")
    func subscribeSafe<O: ObserverType where O.E == E>(observer: O) -> Disposable {
        return self.asObservable().subscribe(observer)
    }
}
All internal subscribe calls go through this method
インターナルなsubscribeはすべてこのメソッドを通して呼ばれるそうで、Emptyで定義していた以下のsubscribeはここで呼び出されているようですね(この"All internal subscribe"がどこまでをカバーしているのかは確認していません)。
class Empty<Element> : Producer<Element> {
    override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
        observer.on(.Completed)
        return NopDisposable.instance
    }
}
コードリーディングまとめ
以下のコードを軽くまとめます。
let emptySequence = Observable<Int>.empty()
emptySequence
    .subscribe { event in
        print(event) // Completed
}
整理すると、
- 
Observable<Int>.empty()で空のシーケンスが作成される 
- 
Empty<E>のインスタンスが生成されている - 
Empty<E>はObservable<Element>のサブクラス - 
Observable<Element>=監察可能(subscribeを実装しているので) 
- 
subscribe(on: (event: Event<E>) -> Void) -> Disposableで流れてくるイベントを処理する 
- 
subscribe(on: (event: Event<E>) -> Void) -> DisposableはObservable<Element>のextensionで定義されている - 引数として渡しているクロージャは
AnonymousObserverがプロパティとして保持 - 
AnonymousObserverインスタンスがsubscribeSafeメソッドの引数に渡される - 
subscribeSafeメソッド内ではEmptyのsubscribeメソッドが実行される - 
Emptyのsubscribeは渡されたobserverのonを実行し、引数にCompletedを渡している 
以上のようになるかと思います。
まとめ
出てくる用語を一つずつ整理すると今回のようにたくさん寄り道をしてソースを読まなければならないので、API.mdやマーブルダイアグラムに目を通してRxの用語を理解しておくともっとスムーズになるかもしれません。
コード的にはclassによる継承が結構使われている印象を受けました。
実装を見るとどのように動くのか分かって良いですね。