Help us understand the problem. What is going on with this article?

RxSwiftの動作を深く理解する

More than 3 years have passed since last update.

はじめに

オブザーバーパターンから始めるRxSwift入門」と「RxSwift入門(2) 非同期処理してみる」の続きです。

これまではとりあえず RxSwift を使って便利さを感じてもらおうという入門記事でした。ここではさらに使いこなせるように、落とし穴にハマらないように、トラブルにあったときに原因を探れるように、RxSwift の動作をより深く理解することを目的にします。

ここで紹介している RxSwift のコードはバージョンは 2.6.0 のものです。

on

subscribe に subscribeNext/subscribeError/subscribeCompleted という専用メソッドがあるように、onNext/onError/onCompleted は on メソッドの専用バージョンです。

on メソッドには Event<E> 型の enum を指定します。Event のコードは以下のようになっていて、Next なら通知する値を、Error なら ErrorType 型を引数に渡します。

public enum Event<Element> {
    case Next(Element)
    case Error(ErrorType)
    case Completed
}

onNext/onError/onCompleted のコードはそれぞれ on のラッパーになっています。1

public extension ObserverType {
    final func onNext(element: E) {
        on(.Next(element))
    }

    final func onCompleted() {
        on(.Completed)
    }

    final func onError(error: ErrorType) {
        on(.Error(error))
    }
}

Observableの数珠繋ぎ構造

subscribeOn や observeOn をどこに挿入すると、どこがどのスレッドで実行されるのかをしっかり理解するために、operator の動作を詳しくみていきましょう。

前の記事で紹介した以下のコードを使いますので、思い出してください。

_ = fetchServerDataWithRequest(request)
  .subscribeOn(ConcurrentDispatchQueueScheduler(
    globalConcurrentQueueQOS: .UserInitiated))
  .map { parse($0) }
  .observeOn(MainScheduler.instance)
  .subscribe(
    onNext: { result in
      // パース済みデータ受信時の処理
    },
    onError: { error in
      // エラー時の処理
    }
  )

operator を使って Observable を変換すると、新しい Observable が作られます。この新しい Observable は下図のように元の Observable を内部に持っていて、自分が subscribe されると 元の Observable を subscribe します。すると元の Observable からイベントが流れてくるので、それを加工したりしてから流すという動作をしているわけです。

ObservableInObservable.png

当然、合成の場合は以下のように複数の Observable を内部に持ちます。

combineObservable.png

「内包している」というのを表現しなければ、以下のように Observable が数珠繋ぎになっていると捉えられます。

Observables.png

上図で fetchServerData, subscribeOn ... は、それぞれの関数が返す Observable を表しています。subscribeOn は自分が内包する Observable の subscribe を指定スレッドで実行する Observable を返し、observeOn は内包する Observable からのイベントを指定スレッドで通知するよう変換する Observable を返すわけです。

青い矢印は subscribe を呼び出したスレッド、シルバーは通信を行ったバックグラウンドスレッドを表しています。

上の数珠繋ぎ構造がイメージできると、Rx のメソッドチェーンによる記述方法が直感的でわかりやすいと感じられるかと思います。

operatorを作ってみる

operator の動作をコードで理解するために、自分で実装してみましょう。

mapを作ってみる

map を自分で実装してみます。実際の実装は Map クラスというのが内部にあるんですが、Observable.create で作ってみます。

extension Observable {
  public func myMap<T> (converter: (E) throws -> T) -> Observable<T> {
    return Observable<T>.create { observer in
      let disposable = self.subscribe(
        onNext: {
          do { try observer.onNext(converter($0)) }
          catch let e { observer.onError(e) }
        },
        onError: { observer.onError($0) },
        onCompleted: { observer.onCompleted() }
      )
      return disposable
    }
  }
}

クロージャ部分が元の Observable である self を強参照でキャプチャして内部に保持しています。subscribe されると create に渡したクロージャが実行され、元の Observable を subscribe します。流れてきたイベントを converter で加工して通知します。

何気に大事なこととして、変換処理で例外が発生した場合onError で通知します

  .map { parse($0) }

この parse で例外が発生した場合、onError で通知されるわけですね。

observeOnもどきを作ってみる

Scheduler でなく dispatch_queue_t を指定する observeOn もどきを作ってみましょう。

extension Observable {
  public func myObserveOn (dispatchQueue: dispatch_queue_t) -> Observable<E> {
    return Observable<E>.create { observer in
      let disposable = self.subscribe { event in
        dispatch_async(dispatchQueue) {
          observer.on(event)
        }
      }
      return disposable
    }
  }
}

ここでは onNext/onError/onCompleted を1つのクロージャでまとめて受け取れるバージョンの subscribe を利用しています。その方がスレッドの指定をまとめて行えるためです。クロージャにはイベント種別を表す Event<E> 型の enum が渡されます。発生したイベントによって処理を分けたい場合は switch で分岐できます。

let disposable = event.subscribe { event in
  dispatch_async(dispatchQueue) {
    switch event {
    case .Next(let value): observer.onNext(value)
    case .Error(let error): observer.onError(error)
    case .Completed: observer.onCompleted()
    }
  }
}

subscribeOnもどきを作ってみる

observeOn と同じ要領で作れますが、問題は返却する Disposable の生成タイミングです。subscribe を別スレッドで行うと、create の戻り値を返すタイミングではまだ返却したい Disposable が取得できません。

extension Observable {
  public func mySubscribeOn (dispatchQueue: dispatch_queue_t) -> Observable<E> {
    return Observable<E>.create { observer in
      let disposable = SingleAssignmentDisposable()
      dispatch_async(dispatchQueue) {
        // 非同期実行されるのでsubscribeで返されるDisposableはcreateのreturnに使えない
        disposable.disposable = self.subscribe { event in
            observer.on(event)
        }
      }
      return disposable
    }
  }
}

ここでは SingleAssignmentDisposable を使っています。こいつは dispose されたことを覚えておいてくれるコンテナ的な Disposable になっています。中身をセットする前に dispose された場合、中身をセットすると即座に dispose してくれます。内部に保持できる Disposable は名前の通り1つだけです。

SingleAssignmentDisposable を dispose すると、

  • もし既に disposable プロパティがセットされていたら、それを dispose
  • disposed プロパティを true に変更

します。disposable プロパティが設定されるときには、

  • 既に disposable プロパティがセットされていたら例外発生
  • disposed が既に true なら渡された Disposable を即時 dispose
  • disposed が false なら内部に保持

という動作になっています。

subscribeOn指定時のDisposable

subscribeOn を使うと Cold な Observable の実行スレッドを変更できます。つまり同期処理を行う Observable を簡単に非同期に変更とかできちゃうわけです。

// drawImageObservableは画像を生成するObservable
let disposable = drawImageObservable.subscribeNext { image in
  // ...
}

上のコードを「思ったより画像生成に時間かかるから非同期にしよう」って思って、

let disposable = drawImageObservable
  .subscribeOn(ConcurrentDispatchQueueScheduler(
    globalConcurrentQueueQOS: .Default))
  .observeOn(MainScheduler.instance)
  .subscribeNext { image in
    // ...
  }

こんな風にするだけで、非同期実行してメインスレッドで結果を受信できるんです。すごい便利!!

本当でしょうか?

時間がかかる処理ならキャンセルしたい場合が出てきます。戻される Disposable を dispose したらキャンセルするようにしたいわけですが、実装方法を考えてみるとこれがうまくいきません。

func drawImage(data: NSData) -> Observable<UIImage> {
  return Observable.create { observer in
    // 描画処理
    return disposable
  }
}

これが subscribeOn を指定しなければ同期実行される・・・ということは、「描画処理」と書いてあるところが終了しないと return まで辿り着かないんです。

前に独自実装した mySubscribeOn を見ると分かる通り、subscribeOn 自体は Disposable を返してくれます。でもこれを dispose しても dispose されたことを覚えてくれるだけで、描画処理をキャンセルしてくれるはずの Disposable は描画処理が終わるまでセットされず dispose されない・・・ので、結局描画処理はキャンセルできません。

このように同期処理の Observable が返す Disposable はキャンセル処理には使えないわけです。キャンセルが必要になる処理は最初から非同期実行するように作っておく必要があります。キャンセルが必要ないなら2、何の問題もなく subscribeOn で非同期処理に変更できます。

スケージューラの使い方

subscribeOn や observeOn などのスケージューラを引数に取るものは、渡されたスケジューラを内部で使っています。ここでは自分でスケージューラを使ってみましょう。

スケジューラには schedule というメソッドがあり、これに実行するクロージャを渡すと、そのスケジューラの動作に合わせて実行してくれます。

let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Default)
let disposable = scheduler.schedule(param) { param in
  // 処理
  return NopDisposable.instance
}

上の例では非同期実行するスケジューラを作っています。schedule メソッドの第一引数はクロージャの引数として渡されます(クロージャにキャプチャ機能があるのであんまり意味ないような・・・)。クロージャの戻り値として Disposable を返しますが、これが shcedule メソッドの戻り値として返されるわけではありません。前に subscribeOn もどきを自作したときと同様に、このクロージャは別スレッドで実行されるため、ConcurrentDispatchQueueScheduler の schedule の戻り値は SingleAssignmentDisposable です。クロージャの戻り値はその SingleAssignmentDisposable にセットされます。

NopDisposable というのは dispose されても何もすることがない場合に使います。シングルトンです。

じゃあこれを使って描画処理を非同期実行してみましょう。キャンセルも可能にします。

func drawImages(data: NSData) -> Observable<UIImage> {
  return Observable.create { observer in
    let cancelDisposable = BooleanDisposable()
    let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Default)
    let disposable = scheduler.schedule(()) { _ in
      // 描画処理(cancelDisposable.disposed == true なら処理をキャンセルする)
      return NopDisposable.instance
    }
    return CompositeDisposable(disposable, cancelDisposable)
  }
}

初登場の Disposable があるので紹介します。まずは BooleanDisposable ですが、これは dipose されたかどうかを覚えるだけのものです。CompositeDisposable は渡された複数の Disposable をまとめて dispose する Disposable を返します。

描画処理は CuncurrentDispatchQueueScheduler により非同期でバックグラウンド動作します。create が返す CompositeDisposable を dispose すると、スケジューラが返す Disposable と BooleanDisposable が一緒に dispose されます。

スケジューラが返す Disposable が dispose されると、処理開始前なら処理を開始しません。処理開始後なら最終的に schdule に渡したクロージャの戻り値が dispose されますが、今回は NopDisposable なので何も起こりません。あとは描画処理のループ内で BooleanDisposable の disposed プロパティを監視して、dispose されたらキャンセルすればOKです。

subscribeするとRxに所有される

以下のコードで、fetchServerDataWithRequest が返す Observable は非同期実行されるわけです。その Observable を特にどこかで所有したりしていません。スコープを抜けたら消滅してしまわないんでしょうか?

func loadData(param: Param) {
  let request = createRequest(param)
  _ = fetchServerDataWithRequest(request).subscribe(
    onNext: { data in
      // データ受信時の処理
    },
    onError: { error in
      // エラー時の処理
    }
  )
}

subscribe された Observable と渡したクロージャは Rx に保持されるとイメージしてください。実際 RxJava では内部のシングルトンオブジェクトに保持されます。RxSwift の場合はガーベージコレクタがないので、相互参照をうまく利用して dispose されるまで解放されない仕組みにしているようです。onError/onCompleted が発生した場合は RxSwift が dispose してくれます。

何が言いたいかというと、これを理解せずに dispose を忘れるとメモリリークする場合があるので気をつけて下さいってことです。

上記の fetchServerDataWithRequest が返す Observable の実装で、下記のようにデータ受信成功時に onCompleted 発行を忘れたとします。すると明示的に dispose していないし onCompleted も発生しないしで、作られた Observable も渡したクロージャも解放されません。loadData を呼び出すたびにメモリリークしていきます。

//...
if let data = data {
  observer.onNext(data)
  observer.onCompleted()  // これ忘れるとメモリリーク
} else {
//...

Cold - Hot 変換のイメージ

先ほど紹介した map と同じ動作をする独自実装コードを見れば分かりますが、これは Cold な Observable です。 subscribe して初めて動作を開始します。そして subscribe される度に別々に動作を開始します。

map を適用する Observable が Cold なら別に当たり前じゃんで済むんです。ところが map を適用する Observable が Hot だったら?

targetName.map { !$0.isEmpty }

上の targetName が Variable から作られた Hot な Observable だとします。Hotの部分は subscribe する前から動作していて、複数から subscribe されても同じ処理の結果が通知されます。以下にこれを map で変換した場合の処理の流れを図示します。

hot_cold.png

map の部分は Cold なので subscribe するまで動作せず、subscribe されると全く同じ処理が複数実行されてしまうわけです。これは無駄ですね。そこで share() または shareReplay(1) を付けて Cold を Hot に変換します。

targetName
  .map { !$0.isEmpty }
  .shareReplay(1)

これで以下のように map 部分の処理の後が Hot になって、同じ処理結果が複数の subscribe に通知されるようになります。

coldToHot.png

shareReplay(1)の動作

自分で書こうかと思っていたのですが・・・「[RxSwift] shareReplayをちゃんと書いてお行儀良くストリームを購読しよう」を読めば十分です。以上。

では何なので、図と実際の RxSwift のコードを交えながらもっと突っ込んでみます。

以前の記事で、「Subject を使った Observable が Hot であり、そうでないものは全て Cold だ」と説明しました。つまり Cold を Hot に変換するには Subject を経由すればいいわけです。

で、その役割をするのが ConnectableObservable です。以下に動作を図示します。

ConnectableObservable.png

connect を呼ぶと内部にある Observable を subscribe して、内部にある Subject に転送するようになります。で、これを subscribe すると Subject を経由したイベントを受け取れるようになるわけです。

shareReplay(1) が何をやっているのか、ちょっとコードを追ってみましょう。

以下は RxSwift/Observables/Observable+Binding.swift にあるコードです。

extension ObservableType {
    public func multicast<S: SubjectType where S.SubjectObserverType.E == E>(subject: S)
        -> ConnectableObservable<S.E> {
        return ConnectableObservableAdapter(source: self.asObservable(), subject: subject)
    }

    public func replay(bufferSize: Int) -> ConnectableObservable<E> {
        return self.multicast(ReplaySubject.create(bufferSize: bufferSize))
    }

    public func refCount() -> Observable<E> {
        return RefCount(source: self)
    }

    public func shareReplay(bufferSize: Int) -> Observable<E> {
        if bufferSize == 1 {
            return ShareReplay1(source: self.asObservable())
        } else {
            return self.replay(bufferSize).refCount()
        }
    }

shareReplay(1) は replay(1).refCount() と同じ動作なのですが、最もよく使う bufferSize = 1 のときだけは最適化されています。

multicast は指定した Subject を使った ConnectableObservable を返します。RxSwift の実装では ConnectableObservable は抽象クラスで、それを継承した ConnectableObservableAdapter という非公開クラスが処理を実装しています。

replay は ReplaySubject を使った ConnectableObservable を返すようになっています。ReplaySubject に渡す bufferSize はキャッシュする直近のイベントの数で、1を指定すると初期値のない BehaviorSubject と同じになります。

なんでキャッシュする必要があるんでしょうかね? もう一度先ほどの ConnectableObservable の図を見てください。源流の BehaviorSubject が1つキャッシュしてるんだから、中継する Subject は別にキャッシュしない PublishSubject 使えばよくないですか?

ConnectableObservable.png

subscribe してから connect するならそれでいいんです。でも先に connect してから subscribe するなら、Observable の源流が BehaviorSubject だった場合はすぐ値が流れてくるので、1つキャッシュしておかないと subscribe したときには直近の値は失われてしまいます。

RefCount のコードを追ってみましょう。RefCount は subscribe されると run が呼ばれて、毎回 RefCountSink を生成して返します。で、それの run が呼ばれます。

class RefCountSink<CO: ConnectableObservableType, O: ObserverType where CO.E == O.E>
// ...
    func run() -> Disposable {
        // subscribe
        let subscription = _parent._source.subscribeSafe(self)

        _parent._lock.lock(); defer { _parent._lock.unlock() } // {
            if _parent._count == 0 {
                // 誰もsubscribeしていなかったらカウントアップしてconnect
                _parent._count = 1
                _parent._connectableSubscription = _parent._source.connect()
            } else {
                // 既にsubscribeされていたらconnect済みなのでカウントアップするだけ
                _parent._count = _parent._count + 1
            }
//...
}

class RefCount<CO: ConnectableObservableType>: Producer<CO.E> {
    // ...
    override func run<O: ObserverType where O.E == CO.E>(observer: O) -> Disposable {
        let sink = RefCountSink(parent: self, observer: observer)
        sink.disposable = sink.run()
        return sink
    }
}

こいつは1つ目は先に subscribe してから connect します。でも2つ目以降は既に connect された状態のものを subscribe するんです。

BehaviorSubject を加工したなら、2つ目以降も subscribe したら直近の値を最初に流して欲しいですよね?なので shareReplay(1) (または引数1に最適化された shareReplayLatestWhileConnected )を使います。

shareの動作

先ほどと同様に RxSwift/Observables/Observable+Binding.swift に以下のコードがあります。

extension ObservableType {
    public func publish() -> ConnectableObservable<E> {
        return self.multicast(PublishSubject())
    }

    public func share() -> Observable<E> {
        return self.publish().refCount()
    }

replay でなく publish を使うと PublishSubject を使ってくれます。で、publish().refCount() と同等なのが share() です。share を使うと一切バッファしない Cold - Hot 変換ができるわけです。

PublishSubject を加工した場合は直近の値をキャッシュしてもらっては困ります。なので share() を使います。

shareReplay(1)とshare()を間違えると?

refCount() の動作を追っていくと、1つ目の subscribe では connect の方が後なので、間違えても動作は変わりません。ReplaySubject は最初に subscribe した時にはまだキャッシュがないので、動作は PublishSubject と同じです。

shareReplay(1) と間違えて share() にすると、2つ目の subscribe からは直近の値が流れてこなくなります。

逆に share() と間違えて shareReplay(1) を使うと、2つ目の subscribe から直近の値が流れてきてしまいます。エラー通知などに利用してると、1度エラーが発生するとその後は画面遷移するたびにアラート表示されるといった現象に繋がります3

1つ目の subscribe では何の問題もなく動作してしまうので、気付きにくいバグにつながりやすいです。注意してください。

この先へ

今回は RxSwift (あるいは Rx そのもの)の動作を深く理解して、問題を事前に回避したり、問題が出たときに調査できるレベルになることを目指しました。今後も何かハマるポイントが見つかったら書き足します。

もうここまでくれば、ネットで見つかる Rx 関係の記事を読んでも理解に困ることはほとんどないかと思います。

偉そうに色々書いてきましたが、自分自身がまだ今回のプロジェクトで RxSwift どころか Swift を初めて使ってまだ3ヶ月です。だからこそ使い始めの記憶がまだ残っていて、こんな風に教えてもらえたら分かりやすかったのにと思って今までの記事を書きました。

あとは(個人的に) RxSwift の機能一覧みたいなカタログがあったら嬉しいなぁ。そんなわけで RxSwiftの機能カタログ を書きました。

他に以下のものが存在します。


  1. finalが付いているのは、メソッド呼び出しのオーバーヘッドがかからないように、最適化によるインライン展開を期待しているのだと思います。 

  2. disposeしても単に結果を無視するだけでバックグラウンドで処理し続けます。 

  3. はい、まさにやらかしました。それでああshare()じゃないとダメなときがあるんだとようやく理解してこの記事に反映されました。リリース前に発覚&原因にすぐ気付いて良かった。 

k5n
好きなことはプログラミングと楽器演奏。そのうち金になる方を仕事に、ならないほうを趣味にしています。 若かりし頃はベンチャーでバリバリやってましたが、一度体を壊してからは忙し過ぎない仕事を探してマイペースにやってます。 C/C++, Swift, Objective-C, Java, Kotlin, Ruby, Python, PHP, TypeScript, Rust
creato
「言われたものを作るだけ」ではない共創型システムパートナーを掲げる名古屋の少数精鋭開発会社。
http://www.creato-c.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした