iPhone
iOS
Swift
ReactiveX
RxSwift

RxSwiftを使ってアプリを作ってみて、よく使った書き方

※追記(16/10/25)
RxSwiftが3.0.0になりfunction名が一部変更されました。この記事はRxSwift2.xのfunction名で書かれているのでご注意ください。(コピペしても使えない点以外は、問題ないと思います。)
https://github.com/ReactiveX/RxSwift/blob/master/CHANGELOG.md


複数の非同期処理を直列に実行したい

concatを使うことで簡潔に書けます。

Sample.swift
[rx_hoge, rx_fuga].concat()
.subscribe(
    onNext: nil,
    onError: { (error) -> Void in

        // いずれかのタスクでonError()された場合に呼ばれる(その後のタスクは実行されなくなる)

    }, onCompleted: { () -> Void in

        // 全てのタスクがonComplete()された場合に呼ばれる

    }, onDisposed: nil)
    .addDisposableTo(disposeBag)

ただし、concatでは前のタスクで得られた値を次のタスクで使うといったことはできません。
(rx_hogeで取得したトークンをrx_fugaで使うなど。)
その場合は、Observable.createを使ってrx_hogeのonNextコールバックから、得られた値を使ってrx_fugaを呼ぶ処理を書いたObservableを作成する必要があります。(簡潔ではないので他に良い方法があったら教えてください。)

複数の非同期処理を並列に実行し, 全て完了した時に次の処理を行いたい(16/7/4追記)

mergeを使います。

Sample.swift
[rx_hoge, rx_fuga].merge()
.subscribe(
    onNext: { (response) in

        // エラーをcatchした場合もここに入ってくる(.catchError)

    }, onError: { (error) in

        // catchしない場合はここに入る

    }, onCompleted: { [weak self] in

        // 全ての処理の実行完了。次の処理を書く

    }, onDisposed: nil)
    .addDisposableTo(disposeBag)

そのまま使うとエラーになった場合には onCompleted は呼ばれませんが, .catchError を挟むことで onError に入る代わりに onNext に任意の値を渡し, 最終的に onCompleted が呼ばれるようにすることができます。

値の監視時に変更前の値とセットで処理したい

変更前の値と比較などしたい時に。scanを使います。

例)モデルが変化した時に前の値と一緒に受け取れるObservableを作成する。

ViewModel.swift
class ViewModel {
    var rx_model: Variable<Hoge>
    var rx_withOldValue: Observable<(new: Hoge, old: Hoge)>

    init(hoge: Hoge) {
        rx_model = Variable(hoge)
        rx_withOldValue = rx_model
        .scan((nil, nil)) { (before: (Hoge?, Hoge?), newValue) -> (Hoge?, Hoge?) in

            (new: current, old: before.0)

       }
    }

}

※16/04/20追記
この方法だと前の値まで取得できますが、単純に値が「変化したかどうか」だけを見てフィルタしたい場合は、distinctUntilChanged()を挟むだけで良いので、より簡潔に書けます。
(値が変わった時だけUILabelのtextを更新したい場合などに。)

複数の値の変更をまとめて監視して同じ処理をしたい

combineLatestを使うことで、複数の変数のいずれかが変更された場合にそれぞれの最新の値をまとめて受け取れるようになります。
変数をバラバラに監視していた場合だと処理を複数箇所に書く必要がありますが、1つにまとめることでわかりやすくなります。

例)ユーザーの名前に関する情報が変更された場合に、ViewModelに持たせている表示用の文字列も変更したい。

User.swift
struct User {
    var rx_firstname: Variable<String>
    var rx_middlename: Variable<String>
    var rx_lastname: Variable<String>
}
ViewModel.swift
class ViewModel {
    var user = User()
    var name = "" // Viewにbindする場合にはVariable<String>になると思います
    let disposeBag = DisposeBag()

    init() {
        // 名前に関する情報が変更されたらnameの値を更新する
        Observable
            .combineLatest(
                user.rx_firstname.asObservable(),
                user.rx_middlename.asObservable(),
                user.rx_lastname.asObservable()) { c in c }
            .subscribeNext { [weak self]  (firstname, middlename, lastname) -> Void in
            }
                    // 更新処理はこの1箇所だけで済む
                    if let s = self {
                        s.name = firstname + middlename + lastname // 表示用に結合
                    }
            }
            .addDisposableTo(disposeBag)
    }
}

一定間隔で処理を行いたい

intervalもしくは timerを使います。(timerの場合はタイマーを開始するまでの時間も指定できます。)
また、タイマーに処理を追加したい場合も簡単に実現できます。

Sample.swift
// 生成
// 1秒間隔で実行。UI変更する場合はMainSchedulerを指定。shareReplayは必要に応じて。
let rx_timer = Observable<Int>
     .interval(1.0, scheduler: MainScheduler.instance)
     .shareReplay(1)

// countには回数が入ってくる
rx_timer
     .subscribeNext { (count) -> Void in
         print("hoge")
    }
    .addDisposableTo(disposeBag)

// ---

// 他の箇所からこのタイマーに処理を追加したい場合, subscribeすることで追加できる。
// この時, shareReplay(1)を設定していれば同じストリームが使われる。(新しくストリームを生成しない)
rx_timer
     .subscribeNext { (count) -> Void in
         print("huga")
    }
    .addDisposableTo(disposeBag)

intervalで1回目の実行をすぐに行いたい場合は .startWith() を指定します。(ただし何もしないとcount値はズレるのでcount値を使う場合は注意)
タイマー自体のpause/resumeについてはdisposeしてsubscribeし直すしかなさそうなので、使い勝手が悪いかもしれないです。

値が入ってから一定の期間の間に入ってきた値をまとめて流したい (16/10/07追記) (16/11/9修正)

bufferを使います。intervalは一定間隔で常に値を流しますが、bufferは値が入ってから一定の期間に入ってきた値をまとめて流します。
intervalはObservableの生成に使うもので、bufferはObservableの変換に使うものなのでこの書き方は正しくありませんでした。

頻繁に値が流れすぎてパフォーマンスに影響が出る場合などに、例えば値が入ってから0.5秒の間に来た値をまとめて流すといったことができます。(後述のthrottleを使ったほうが良い場合もあります。)

Sample.swift
// NotificationCenterからの通知をまとめたい場合
NSNotificationCenter.defaultCenter()
    .rx_notification("Hoge")
    // 個数でも絞りたい場合は count: に その値を入れる。timeSpan内でも3個値が溜まったら流す場合は count: 3。
    .buffer(timeSpan: 0.5, count: Int.max, scheduler: MainScheduler.instance)
    .subscribeNext { (notifications) -> Void in
       // notifications には0.5秒の間に入ってきた複数の NSNotification インスタンスが入ってくる。
    }

値が入ってから一定間隔の間に入ってきた値のうち、最後の値のみを流したい (16/11/9追加)

throttleを使います。(debounceオペレータと同義)
bufferは来た値を削らずにまとめて流しますが、throttleは一定間隔の間に入ってきた値のうち、最後の値のみを流します(他の値は削られます)。

Sample.swift
// NotificationCenterからの通知を削りたい場合
NSNotificationCenter.defaultCenter()
    .rx_notification("Hoge")
    .throttle(0.5, scheduler: MainScheduler.instance)
    .subscribeNext { (notification) -> Void in
       // notification には0.5秒の間に入ってきた値のうち、最後に入った NSNotification インスタンスが入る。
    }

複数のイベントのうち最初に来たイベントのみ処理を行いたい (16/04/09追記)

例えば、送信ボタンをタップするとサーバーに情報を送信する機能があったとして、追加でこんな要件があるときに使えます。

  • タップ後10秒経過したら送信
  • 10秒の間は、キャンセルボタンでキャンセル可能
  • アプリがバックグラウンドに戻った場合は10秒経っていなくても送信する

普通にやると面倒な感じですが、RxSwiftを使って書くと楽になります。

Sample.swift
let cancelId  = 1
let executeId = 2

// ※一例なのでObservableは必要に応じてインスタンスに持たせてください

// 一定秒数のキャンセル可能期間
let rx_timer = Observable<Int>.interval(10.0, scheduler: MainScheduler.instance)
    .map({ _ in executeId })

// キャンセルボタン
let rx_cancelTapWithId = actionCancelButton.rx_tap.asObservable()
    .map() { _ in cancelId }

// 強制実行
let rx_notification = NSNotificationCenter.defaultCenter()
    .rx_notification(UIApplicationDidEnterBackgroundNotification)
    .map() { _ in executeId }

[rx_timer, rx_cancelTapWithId, rx_notification]
    .toObservable()
    .merge()
    .take(1) // 最初に来た1つのみを処理
    .subscribeNext { (id) -> Void in

        if id == executeId {

            // 処理を実行

        } else if id == cancelId {

            // キャンセル

        }

    }
    .addDisposableTo(disposeBag)

他にもあったら追記します。もっと良い書き方や間違いがありましたら、ご指摘いただけると助かります!