Edited at

RxSwiftのソースコードをちょっと読んでみた


RxSwiftのソースコードを読んでみて

隙間時間を見つけてRxSwiftを勉強し始めて数か月が経つのですが、簡単な実装でも出てくる用語とかオペレータが多く短時間だとなかなか理解が深まりませんでした。

RxSwiftのPlaygroundを触ったり、Exampleを触ったり、マーブルダイアグラムを見たりして、ようやくある程度理解できるようになってきました。

オペレータについて追ってみたのでまとめてみました。もし間違いなどがあればご指摘いただきたいです。


RxSwiftのドキュメントについて

RxSwiftのリポジトリにはDocumentationが用意されており、いろいろなドキュメントが用意されています。

HotAndColdObservables.mdなんかはHotとColdの違いについて表も用いて説明されているので一度見てみるといいと思います。

RxSwiftで実装されているオペレータについてはAPI.mdに詳しく書かれています。


API.mdに書かれていること

導入部分でさらっと説明が書かれていますが、まとめてみるとこんな感じです。


  • いろいろなプラットフォームでいろいろな実装がされており、同じオペレーターでも異なる呼び方をしている

  • これは歴史的な理由や言語の予約語が原因だったりする

  • オペレーターは状態を持たない

  • 目的に応じたAPIのリファレンスが書かれている

ここではオペレータについて記載されています。リンク先はReactiveXのページになっており、マーブルダイアグラムを見ることができます。リンク先のページ下部には言語ごとの実装について記載されていますが、RxSwiftは"TBD"と書かれており、サンプルなどは書かれていません。

API.mdCreating ObservablesasObservableのリンク先を見てみると、asObservableFromと言うことが分かったりします。

また、ここに書かれている基本的な関数については、ある程度RxSwiftに含まれているPlaygroundで動作を確認することができます。

今回はemptyのソースを追って、ObservableObserverTypeAnonymousObserverなどに触れてみました。


emptyのソースを追ってみる

emptyはReactiveXのリンクを見ても"Empty"と紹介されています。


create an Observable that emits no items but terminates normally


何もアイテムを発行しないが、正常に終了するObservableを作成します。

Playgroundでは、こんな感じでサンプルが書かれていました。

let emptySequence = Observable<Int>.empty()

emptySequence
.subscribe { event in
print(event) // Completed
}

emptyで作成したObservableに対してsubscribeを呼び出しています(subscribeは後ほど)。


Event<Element>クラス

emptyは何もアイテムを発行しないのでCompletedと出力されるのですが、これの正体はEventというenumになります。


Event.swift

public enum Event<Element> : CustomDebugStringConvertible {

/**
Next element is produced
*/

case Next(Element)

/**
Sequence terminates with error
*/

case Error(ErrorType)

/**
Sequence completes sucessfully
*/

case Completed
}


Eventは3つのメンバーを持っており、Next(Element)Error(ErrorType)、そしてCompletedとなっています。emptyは何もアイテムを発行せずに終了と書かれていたので、Completedが完了を意味することが分かります。


Observable<Element>クラス

emptyの話に戻ります。emptyの実装は以下のようになっています。emptyは引数を何も取らずに型EObservableを返すことが分かります。


Observable+Creation.swift

public static func empty() -> Observable<E> {

return Empty<E>()
}

emptyObservableのextensionです。Observable+Creation.swiftというファイルで定義されており、このファイルにはcreate,justといった関数も定義されており、ファイル名通りObservableを作成するための便利メソッドが定義されています。

Observableを見てみましょう(実際はコメントがありますが長くなるので削除しています)。


Observable.swift

public class Observable<Element> : ObservableType {

public typealias E = Element

init() {
#if TRACE_RESOURCES
OSAtomicIncrement32(&resourceCount)
#endif
}

public func subscribe<O: ObserverType where O.E == E>(observer: O) -> Disposable {
abstractMethod()
}

public func asObservable() -> Observable<E> {
return self
}

deinit {
#if TRACE_RESOURCES
AtomicDecrement(&resourceCount)
#endif
}

internal func composeMap<R>(selector: Element throws -> R) -> Observable<R> {
return Map(source: self, selector: selector)
}
}


Observableはジェネリックなクラスになっており、外部公開メソッドとしてsubscribeasObservableを実装しています。

subscribeabstractMethod()を実行していますが、@noreturnな関数で内部的にfatalErrorを呼び出しています。継承して使うものということでしょうか。

asObservableselfを返しているだけです。


ObservableTypeプロトコル

また、ObservableクラスはObservableTypeプロトコルに適合していることが分かります。ObservableTypeプロトコルは以下のようになっています(実際はコメントがありますが長くなるので削除しています)。


ObservableType.swift


public protocol ObservableType : ObservableConvertibleType {

associatedtype E

@warn_unused_result(message="http://git.io/rxs.ud")
func subscribe<O: ObserverType where O.E == E>(observer: O) -> Disposable

}


関連型Eを持ち、subscribeメソッドを実装することが、ObservableTypeプロトコルには必要であることが分かります。

subscribeできることがObservable=観察可能である、という性質を表しているということだと思います。


Disposableプロトコル

subscribeObserverTypeを引数に取り、Disposableを返します。この時ObserverTypeの関連型とObservableTypeの関連型は同じである必要があります。

Disposableはプロトコルです。


Disposable.swift

/**

Respresents disposable resource.
*/

public protocol Disposable {
/**
Dispose resource.
*/

func dispose()
}

disposeを実装することでリソースの処分を行うみたいです。


ObserverTypeプロトコル

ObserverTypeプロトコルも見てみます。ObserverTypeも関連型Eを持ち、onメソッドの実装が求められます。Eventが再登場しました。


ObserverType.swift

public protocol ObserverType {

/**
The type of elements in sequence that observer can observe.
*/

associatedtype E

/**
Notify observer about sequence event.

- parameter event: Event that occured.
*/
func on(event: Event<E>)
}


onメソッドのはオブザーバー=観察者に対してイベントを通知するためのメソッドのようです。

ObserverTypeにはEvent各種に対する便利メソッドのデフォルト実装が用意されていました。


ObserverType.swift

public extension ObserverType {

/**
Convenience method equivalent to `on(.Next(element: E))`

- parameter element: Next element to send to observer(s)
*/
final func onNext(element: E) {
on(.Next(element))
}

/**
Convenience method equivalent to `on(.Completed)`
*/

final func onCompleted() {
on(.Completed)
}

/**
Convenience method equivalent to `on(.Error(error: ErrorType))`
- parameter error: ErrorType to send to observer(s)
*/

final func onError(error: ErrorType) {
on(.Error(error))
}
}



Emptyクラス

さて、Observableがなんとなくわかったので、emptyを再度見てみましょう(Observableを見るだけだったのですがだいぶ遠回りしました)。

emptyメソッドはObservable<E>を戻り値として返しています。


Observable+Creation.swift

public static func empty() -> Observable<E> {

return Empty<E>()
}

Emptyのインスタンスを返していますが、EmptyObservableのサブクラスなのでしょう。

このEmptyは以下のように定義されていました。EmptyクラスはProducerクラスを継承したもので、subscribeメソッドをオーバーライドしています(ProducerObservableのサブクラスです)。


Empty.swift

class Empty<Element> : Producer<Element> {

override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
observer.on(.Completed)
return NopDisposable.instance
}
}

先ほど出てきたObserverTypeonEvent.Completedを渡していますね。Eventには.Next.Errorもありますが、どちらも渡していません。

EmptypublicになっていないのでRxSwiftをモジュールとして使う際には使えません。


NopDisposableクラス

NopDisposableは何もオペレーションの必要がないDisposableでシングルトンになっていました。何もしないDisposableみたいです。


NopDisposable.swift

public struct NopDisposable : Disposable {

/**
Singleton instance of `NopDisposable`.
*/

public static let instance: Disposable = NopDisposable()

init() {

}

/**
Does nothing.
*/

public func dispose() {
}
}



ここまでのまとめ

ここまでをまとめると、観察可能なObservableTypeに適合したObservable<E>を継承したEmptyのインスタンスをemptyメソッドの戻り値とすることでイベントの監視を可能にしています。


Observable+Creation.swift

public static func empty() -> Observable<E> {

return Empty<E>()
}


subscribe(on: (event: Event<E>) -> Void) -> DisposableはObservable<Element>メソッド

ここでようやくPlaygroundのサンプルに戻ります。

let emptySequence = Observable<Int>.empty()

emptySequence
.subscribe { event in
print(event) // Completed
}

Observable<Int>.empty()の流れはここまで書いてきたとおりです。Observable<Int>は観察可能なクラスで、emptyクラスメソッドによって.CompletedEventのみのシーケンスが生成されます。それがemptySequenceです。

Emptysubscribeを実装しているので、subscribeを呼び出すことができます。

と思いきや、Playgroundで使用されているsubscribeEmptyクラスに定義されているsubscribeとは引数が異なります。

Emptyはこのように実装されていました。ObserverTypeを引数として受け取ってDisposableを戻り値としていました。


Empty.swift

class Empty<Element> : Producer<Element> {

override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
observer.on(.Completed)
return NopDisposable.instance
}
}

しかしPlaygroundで使われていたのは以下のsubscribeです。Eventを引数にVoidを返す関数を第一引数とし、戻り値はDisposable型です。


Observable+Extensions.swift

@warn_unused_result(message="http://git.io/rxs.ud")

public func subscribe(on: (event: Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(event: e)
}
return self.subscribeSafe(observer)
}


AnonymousObserverクラス

引数として渡されたonAnonymousObserverに渡されています。このクラスはObserverBase<ElementType>のサブクラスとなっており、ObserverBaseObserverTypeDisposableに適合しています。


AnonymousObserver.swift

class AnonymousObserver<ElementType> : ObserverBase<ElementType> {

typealias Element = ElementType

typealias EventHandler = Event<Element> -> Void

private let _eventHandler : EventHandler

init(_ eventHandler: EventHandler) {
#if TRACE_RESOURCES
AtomicIncrement(&resourceCount)
#endif
_eventHandler = eventHandler
}

override func onCore(event: Event<Element>) {
return _eventHandler(event)
}

#if TRACE_RESOURCES
deinit {
AtomicDecrement(&resourceCount)
}
#endif
}


AnonymousObserverEvent<Element> -> Void型の値をプロパティとして保持しており、オーバーライドしているonCoreが、スーパークラスであるObserverBase<ElementType>onで呼び出されています。

ObserverBase<ElementType>onCoreはオーバーライドされることが前提でabstractMethod()が呼ばれています(いまのところAnonymousObserver以外にObserverBase<ElementType>を継承しているクラスはありませんでした)。


ObserverBase.swift

class ObserverBase<ElementType> : Disposable, ObserverType {

typealias E = ElementType

private var _isStopped: AtomicInt = 0

func on(event: Event<E>) {
switch event {
case .Next:
if _isStopped == 0 {
onCore(event)
}
case .Error, .Completed:

if !AtomicCompareAndSwap(0, 1, &_isStopped) {
return
}

onCore(event)
}
}

func onCore(event: Event<E>) {
abstractMethod()
}

func dispose() {
_isStopped = 1
}
}


イベントを受け取ってObserverTypeonを実行するのがAnonymousObserverの役割っぽいです。


Observable+Extensions.swift

@warn_unused_result(message="http://git.io/rxs.ud")

public func subscribe(on: (event: Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(event: e)
}
return self.subscribeSafe(observer)
}


subscribeSafeメソッド

AnonymousObserverのインスタンスはsubscribeSafeメソッドの引数として渡されています。


Observable+Extensions.swift

public extension ObservableType {

/**
All internal subscribe calls go through this method
*/

@warn_unused_result(message="http://git.io/rxs.ud")
func subscribeSafe<O: ObserverType where O.E == E>(observer: O) -> Disposable {
return self.asObservable().subscribe(observer)
}
}


All internal subscribe calls go through this method


インターナルなsubscribeはすべてこのメソッドを通して呼ばれるそうで、Emptyで定義していた以下のsubscribeはここで呼び出されているようですね(この"All internal subscribe"がどこまでをカバーしているのかは確認していません)。


Empty.swift

class Empty<Element> : Producer<Element> {

override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
observer.on(.Completed)
return NopDisposable.instance
}
}


コードリーディングまとめ

以下のコードを軽くまとめます。

let emptySequence = Observable<Int>.empty()

emptySequence
.subscribe { event in
print(event) // Completed
}

整理すると、



  1. Observable<Int>.empty()で空のシーケンスが作成される



    • Empty<E>のインスタンスが生成されている


    • Empty<E>Observable<Element>のサブクラス


    • Observable<Element>=監察可能(subscribeを実装しているので)




  2. subscribe(on: (event: Event<E>) -> Void)
    -> Disposable
    で流れてくるイベントを処理する



    • subscribe(on: (event: Event<E>) -> Void)
      -> Disposable
      Observable<Element>のextensionで定義されている

    • 引数として渡しているクロージャはAnonymousObserverがプロパティとして保持


    • AnonymousObserverインスタンスがsubscribeSafeメソッドの引数に渡される


    • subscribeSafeメソッド内ではEmptysubscribeメソッドが実行される


    • Emptysubscribeは渡されたobserveronを実行し、引数にCompletedを渡している



以上のようになるかと思います。


まとめ

出てくる用語を一つずつ整理すると今回のようにたくさん寄り道をしてソースを読まなければならないので、API.mdマーブルダイアグラムに目を通してRxの用語を理解しておくともっとスムーズになるかもしれません。

コード的にはclassによる継承が結構使われている印象を受けました。

実装を見るとどのように動くのか分かって良いですね。