iOS
Rx
Swift
RxSwift

RxSwiftの機能カタログ

More than 1 year has passed since last update.

はじめに

既に RxSwift が使えるようになった人のために、RxSwift が用意しているクラスやメソッドのカタログをつくってみました。

RxSwift の使い方を覚えたい人は、先に

  1. オブザーバーパターンから始めるRxSwift入門
  2. RxSwift入門(2) 非同期処理してみる
  3. RxSwiftを深く理解する

を読んでください。

また Rx を使った設計については「 Rxを使った設計をビジュアル化する」を参考にしてください。

ここの機能カタログは RxSwift 2.6.0 を元に作成しました1。対応するソースコードを追いやすいように、見出しをフォルダ階層やファイル名と一致させています。public として公開されているものだけが対象です。

また RxSwift 独自の機能でないものは、できるだけ本家 ReactiveX の解説へのリンクを載せています。英語ですがマーブルダイアグラム(RxMarblesと同じもの)が載っているので、英語が苦手な人にも参考になると思います。このマーブルダイアグラムはイベントをマウスでドラッグできます。

自分も使ったことないものが沢山あって、ソースコードを読んだだけで利用方法を書いているものが多数あります。嘘が混ざってるかもしれませんので、(特に ReactiveX に説明がないようなちょっとマイナーなオペレータは)安易に書かれていることを信じずに動作確認してください。もし実際に利用した時に説明と違うようならご指摘頂けると助かります。

Subjects

RxSwift に存在する Subject は既に紹介した PublishSubject, BehaviorSubject, ReplaySubject の3つだけです。それと内部で BehaviorSubject を使う Variable がこのグループに含まれています。

ReactiveX の Subject 解説 では AsyncSubject というのがあるのですが、RxSwift にはありません。一方 Variable は RxSwift 独自のものです。

クラス 説明
PublishSubject 一切キャッシュしないSubject
BehaviorSubject 直近の値を1つだけキャッシュするSubjectで、初期値を与えることができる。
ReplaySubject 指定した数または全てをキャッシュするSubjectで、初期値は与えられない。
Variable 値を持ち、Observableに変換して値の変化を監視できる。内部にBehaviorSubjectをラップして使いやすくしたもの。onErrorは発生させられない。

Subject, Variable の使い方は「オブザーバーパターンから始めるRxSwift入門」を参照してください。

ReplaySubject は「RxSwiftを深く理解する」の Cold-Hot 変換で登場しました。キャッシュする数を指定して生成するには create クラスメソッドを、全てをキャッシュするには createUnbounded クラスメソッドを使います。

// キャッシュ上限指定。
let replayThree = ReplaySubject.create(bufferSize: 3)
// キャッシュ上限なし。全てをキャッシュする。
let replayAll = ReplaySubject.createUnbounded()

Disposables

このグループでは各種の Disposable 派生クラスと、「オブザーバーパターンから始めるRxSwift入門」で紹介した DisposeBag が提供されています。

Disposable は ReactiveX には統一された説明がありません。元祖 .Net の ReactiveExtensions は Disposable ですが、RxJava では Subscription と呼ばれています。その辺が不統一なので説明がないのでしょうかね?

以下が Disposable 派生クラスの一覧です。Anonymous, Boolean, Nop 以外は内部に別の Disposable を保持するコンテナやラッパーの役割になっています。

クラス 説明
AnonymousDisposable dispose が呼ばれたときに実行するアクションを指定する
BinaryDisposable 2つの Disposable をまとめる
BooleanDisposable dispose されたかどうかを保持するだけ
CompositeDisposable 一緒に dispose される Disposable をグループ化
NopDisposable dispose されても何もしない
RefCountDisposable 参照カウントで管理する
ScheduledDisposable 指定したスケジューラで dispose 処理をする
SerialDisposable 新しいものが設定されると古いものを dispose して置き換える
SingleAssignmentDisposable 1度だけ内部の Disposable を設定できる
StableCompositeDisposable BinaryDisposable を返すだけ
DisposeBag 追加されたものを自身が解放されるときにまとめて dispose する

AnonymousDisposable

dispose が呼ばれたときに実行するアクションを指定する Disposable です。

let observable = Observable.create { observer in
  // ...
  return AnonymousDisposable {
    task.cancel()
  }
}

BinaryDisposable

2つの Diposable をまとめる Disposable です。dispose すると両方が dispose されます。コンストラクタが公開されておらず、StableCompositeDisposable 経由で生成します。

let binaryDisposable = StableCompositeDisposable.create(disposable1, disposable2)

BooleanDisposable

dispose されたかどうかを保持するだけの Disposable です。dispose すると disposed プロパティが true になります。生成時に初期値を与えます。

// 引数省略なら初期値はfalse
let booleanDisposable = BooleanDisposable()

// 最初からdispose済み扱いのDisposableを作る
let booleanDisposable = BooleanDisposable(disposed: true)

CompositeDisposable

一緒に dispose される Disposable をグループ化します。

// 4つまでなら初期化時の引数で渡せる
let compositDisposable2 = CompositeDisposable(disposable1, disposable2)
let compositDisposable3 = CompositeDisposable(disposable1, disposable2, disposable3)
let compositDisposable4 = CompositeDisposable(disposable1, disposable2, disposable3, disposable4)

// それ以上は配列として渡す
let disposables = [disposable1, disposable2, disposable3, disposable4, disposable5]
let compositDisposable = CompositeDisposable(disposables)

後から追加したり削除したりできます。

// 追加するとCompositeDisposable.DisposeKey?型のキーが返る
let key = compositDisposable.addDisposable(disposable6)
// キーを指定してdisposableを削除
if let key = key { compositeDisposable.removeDisposable(key) }

何個保持しているかは count プロパティで取得できます。

また既に dispose された CompositeDisposable に addDisposable した場合は、引数に渡したものが即座に dispose され、キーとして nil が返ります(つまり追加はされません)。

NopDisposable

dispose されても何もしない Disposable です。シングルトンインスタンスを取得して利用します。

let observable = Observable.create { observer in
  // (何か同期処理)
  return NopDisposable.instance // どうせキャンセルもできないし、やることないや。
}

RefCountDisposable

参照カウント管理する Disposable です。初期化時に管理したい disposable を渡します。

let refCountDisposable = RefCountDisposable(disposable)

生成時点では参照カウントはゼロです。参照カウントは retain を呼び出すことで増えます。参照カウントがゼロの状態で dispose すると中身が dispose されます。参照カウントがゼロでない時に dispose すると、参照カウントが減るだけで中身は dispose されません。

// retain呼び出しで新しい RefCountDisposable が返され、参照カウントが1増える。
let refCountDisposable2 = refCountDisposable.retain()
// 参照カウントがゼロでないのでdisposeしても中身はdisposeされない。参照カウントが1減る。
refCountDisposable.dispose()
// 参照カウントゼロでのdisposeなので、中身もdisposeされる
refCountDisposable2.dispose()

ScheduledDisposable

指定したスケジューラで dispose 処理をする Disposable です。初期化時にスケジューラと管理したい disposable を渡します。

let shceduledDisposable = ScheduledDisposable(
    scheduler: MainScheduler.instance, disposable: disposable)

SerialDisposable

内部に1つだけ Disposable を保持でき、新しいものが設定されると古いものを dispose して置き換えます。

// 初期化時は中身がカラ
let serialDisposable = SerialDisposable()

// 中身を設定
serialDisposable.disposable = disposable1

// 別の中身に置き換えると、前のdisposabe1はdisposeされる
serialDisposable.disposable = disposable2

// 本体をdisposeすると中身もdisposeされる
serialDisposable.dipose()

// dispose済みの状態で中身を設定しようとすると、
// 設定したものが即時disposeされる(保持されない)
serialDisposable.disposable = disposable3

SingleAssignmentDisposable

内部に1度だけ Disposable を設定できる Disposable です。2度目の設定では例外が発生します。

// 初期化時は中身がカラ
let singleAssignmentDisposable = SingleAssignmentDisposable()

// 中身を設定
serialDisposable.disposable = disposable1

// disposeすると中身もdisposeされる
singleAssignmentDisposable.dispose()

中身の設定より前に dispose された場合は、それを覚えていて、設定と同時に渡された Disposable を dispose します(内部に保持しません)。

// 初期化時は中身がカラ
let singleAssignmentDisposable = SingleAssignmentDisposable()

// disposeされるとそれを覚えている
singleAssignmentDisposable.dispose()

// 中身を設定すると即座にdisposable1はdisposeされる
serialDisposable.disposable = disposable1

StableCompositeDisposable

今の所 BinaryDisposable を返すだけです。元祖 ReactiveExtensions にあるので用意したと思われますが、そちらは配列で3つ以上を渡すこともできます。BinaryDisposable でも入れ子にしていけば3つ以上を管理できますが、階層が深くなるほどパフォーマンスが落ちますね。Stable と付いているのは CompositeDisposable と違って addDisposable / removeDisposable がなく immutable だからでしょう。

let binaryDisposable = StableCompositeDisposable.create(disposable1, disposable2)

DisposeBag

使い方は「オブザーバーパターンから始めるRxSwift入門」を参照してください。

これも Disposable 派生クラスなので removeDisposable が必要ないなら CompositeDisposable の代わりに使うことも可能ではあります。しかしそういう用途のために用意されているわけではなく、

  • 自身が解放されるときにまとめて dispose してくれる
  • Disposable の拡張メソッド addDisposableTo の引数に指定できる

ことが特徴です。

Rx では Disposable を入れるコンテナ的な役割のものは、それ自身が dispose されるとそれを覚えていて、以降に追加されたものは即時 dispose されます。DisposeBag も同様です。つまり DisposeBag は一度 dispose すると再利用できません。生成し直す必要があります。

Schedulers

処理スレッドを表す各種スケジューラがこのグループで提供されています。以下がその一覧です。

クラス 説明
ConcurrentDispatchQueueScheduler dispath_queue_t や QOS で指定する並列実行
ConcurrentMainScheduler subscribeOn に最適化されたメインスレッド指定
CurrentThreadScheduler 現在のスレッドで処理をキューイングして順次実行
HistoricalScheduler NSDate, NSTimeInterval を使う VirtualTimeScheduler
ImmediateScheduler 現在のスレッドで処理を即時実行
MainScheduler observeOn に最適化されたメインスレッド指定
OperationQueueScheduler NSOperationQueue を使った非同期実行
SerialDispatchQueueScheduler dispatch_queue_t や QOS で指定する順次実行
VirtualTimeScheduler 時間の経過をプログラムで制御できる

ReactiveX の説明には種類はのってないですね。各言語で非同期の取り扱いが違うので、統一はできないかと思います。

RxSwift のドキュメントはこちらです。

ConcurrentDispatchQueueScheduler

処理をバックグラウンドで並列実行するのに使います。GCD の dispatch_queue_t で指定します。iOS 8 以上なら QOS による指定もできます。一応並列キューを指定する想定の名前になっていますが、シリアルキューを渡しても問題なく動作します2

// dispatch_queue_tで指定
let scheduler1 =  ConcurrentDispatchQueueScheduler(
  queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))

// QOSで指定
let scheduler2 = ConcurrentDispatchQueueScheduler(
  globalConcurrentQueueQOS: .UserInitiated))

ConcurrentMainScheduler

メインスレッドで実行するように指定します。スケジュールするメソッドがメインスレッドで呼び出された場合、スケジューリングされることなく即時実行されます。

subscribeOn に最適化されているので、observeOn には MainScheduler を利用してくださいとのこと。

let disposable = observable
  .subscribeOn(ConcurrentMainScheduler.instance)
  .subscribeNext { value in
    // ...
  }

CurrentThreadScheduler

実行スレッドを切り替えずに現在のスレッドで処理をキューイングして順次実行します。Observable.create などのイベントを生成するオペレータのデフォルトのスケジューラはこれ。

後述する ImmediateScheduler と違って、渡した処理を順次実行します。つまり前の処理が終わるまで次の処理が開始しません。

let scheduler = CurrentThreadScheduler.instance
let disposable = scheduler.schedule(()) { _ in
  print("Task1 start")

  let disposable = scheduler.schedule(()) { _ in
    print("Task2 start")
    print("Task2 end")
    return NopDisposable.instance
  }

  print("Task1 end")
  return disposable
}

上記では Task1 の中で Task2 を開始していますが、それはキューイングされて実行は Task1 が終了してからになります。

Task1 start
Task1 end
Task2 start
Task2 end

HistoricalScheduler

時間の経過をプログラムで制御することができるスケジューラです。テストするときに便利そうです。

後述する VirtualTimeSchduler の時間や時間間隔を表す型として、RxSwift 内の現実時間と同じ NSDate, NSTimeInterval を利用するバージョンです。

let startTime = NSDate()
let schduler = HistoricalScheduler(initialClock: startTime)
let compositeDisposable = CompositeDisposable() // まとめてdisposeできるように

// 10秒後に実行
_ = compositeDisposable.addDisposable(
  scheduler.scheduleRelative((), dueTime: 10.0) { _ in
    //...
  })

// 絶程時間指定(今から20秒後)
_ = compositeDisposable.addDisposable(
  scheduler.scheduleAbsoluteVirtual((), time: NSDate(timeIntervalSinceNow: 20)) { _ in
    /...
  })

// 即時実行
_ = compositeDisposable.addDisposable(
  scheduler.schedule(()) { _ in
    //...
  })

// スケジューラ内での時刻はinitialClockで与えた時間。
// この時間までに実行すべきものだけ実行される(つまり即時実行を指定したものだけ)。
scheduler.start()
// スケジューラ内の時間を15秒進める。10秒後に指定したものが実行される。
scheduler.advanceTo(NSDate(timeInterval: 15, sinceDate: startTime))
// スケジューラ内の時間を25秒進める。20秒後に指定したものが実行される。
scheduler.advanceTo(NSDate(timeInterval: 20, sinceDate: startTime))

ImmediateScheduler

実行スレッドを切り替えずに現在のスレッドで処理を即時実行します。前述した CurrentThreadScheduler と動作を比べてみましょう。

let scheduler = ImmediateScheduler.instance
let disposable = scheduler.schedule(()) { _ in
  print("Task1 start")

  let disposable = scheduler.schedule(()) { _ in
    print("Task2 start")
    print("Task2 end")
    return NopDisposable.instance
  }

  print("Task1 end")
  return disposable
}

上記では Task1 の中で Task2 を開始していますが、CurrentThreadScheduler と違ってキューイングされたりせずにその時点で処理が開始します。Task1 が終わるまで待ったりしません。

Task1 start
Task2 start
Task2 end
Task1 end

MainScheduler

メインスレッドで実行するように指定します。スケジュールするメソッドがメインスレッドで呼び出された場合、スケジューリングされることなく即時実行されます。

obserbeOn に最適化されているので、subscribeOn には ConcurrentMainScheduler を利用してくださいとのこと。

let disposable = observable
  .observeOn(MainScheduler.instance)
  .subscribeNext { value in
    // ...
  }

asyncInstance を使ってメインスレッドで非同期実行を指定することもできます。一旦ランループに戻ってからメインスレッドで実行されます。

let disposable = observable
  .observeOn(MainScheduler.asyncInstance)
  .subscribeNext { value in
    // ...
  }

実装をみると、asyncInstance の実体は dispatch_get_main_queue() を指定した SirialDispatchQueueScheduler になっています。

OperationQueueScheduler

NSOperationQueue を使った非同期実行を行います。同時に実行する数を maxConcurrentOperationCount で制御したい場合に便利です。

let operationQueue = NSOperationQueue()
operationQueue.qualityOfService = .UserInitiated
operationQueue.maxConcurrentOperationCount = 3
let scheduler = OperationQueueScheduler(operationQueue: operationQueue)

SerialDispatchQueueScheduler

処理をバックグラウンドで順番に実行する場合に使います。GCD の dispatch_queue_t で指定します。iOS 8 以上なら QOS による指定もできます。

dispatch_queue_t として並列キューを渡しても、内部でシリアルキューを生成しているので schedule に渡した処理はシリアル実行されます。

// dispatch_queue_tで指定
// 低優先度の並列キューを渡しているが、そのキューを使ってシリアル実行される。
let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))
let scheduler1 =  SerialDispatchQueueScheduler(
  queue: queue, internalSerialQueueName: "name")

// QOSで指定
let scheduler2 = SerialDispatchQueueScheduler(
  globalConcurrentQueueQOS: .Default))

// 内部のシリアルキューの設定をクロージャで行える。以下は一番最初の例と同じ動作。
let scheduler3 = SerialDispatchQueueScheduler(
  internalSerialQueueName: "name") { serialQueue in
    dispatch_set_target_queue(serialQueue, queue)
  }

// 内部でシリアルキュー作ってるんだから単にそれ使えばいいんじゃない?
let scheduler3 = SerialDispatchQueueScheduler(internalSerialQueueName: "name")

VirtualTimeScheduler

時間の経過をプログラムで制御できるスケジューラです。現在時刻と、指定する時間をどう扱うか決めるコンバータを初期化時に指定します。

RxSwift 内での現実時間を表す型は RxTime で、時間間隔は RxTimeInterval です。これはそれぞれ NSDate と NSTimeInteval の typealias です。コンバータは自分が利用したい時間と、RxSwift 内での時間との相互変換を行います。

コンバータとして利用時間に NSDate, NSTimeInterval を利用する HistoricalScheduler が用意されているので、普通はそちらを使えば十分かと思います。

以下の例では時間を UNIX 時間で管理するコンバータを作っています。中途半端は嫌いな男前で、整数の秒単位でしか表せません。

/// Int64のUNIX時間で時間を管理するコンバータ
class UnixTimeConverter : VirtualTimeConverterType {
  /// 時間を表す型
  typealias VirtualTimeUnit = Int64

  /// 時間間隔を表す型
  typealias VirtualTimeIntervalUnit = Int64

  /// 仮想時間から現実時間への変換
  func convertFromVirtualTime(virtualTime: VirtualTimeUnit) -> RxTime {
    return NSDate(timeIntervalSince1970: NSTimeInterval(virtualTime))
  }

  /// 現実時間から仮想時間への変換
  func convertToVirtualTime(time: RxTime) -> VirtualTimeUnit {
    return VirtualTimeUnit(time.timeIntervalSince1970)
  }

  /// 仮想時間での時間間隔から現実時間での時間間隔へ変換
  func convertFromVirtualTimeInterval(virtualTimeInterval: VirtualTimeIntervalUnit) -> RxTimeInterval {
    return RxTimeInterval(virtualTimeInterval)
  }

  /// 現実時間での時間間隔から仮想時間での時間間隔へ変換
  func convertToVirtualTimeInterval(timeInterval: RxTimeInterval) -> VirtualTimeIntervalUnit {
    return VirtualTimeIntervalUnit(timeInterval)
  }

  /// 仮想時間に仮想時間での時間間隔オフセットした仮想時間を返す
  func offsetVirtualTime(time time: VirtualTimeUnit, offset: VirtualTimeIntervalUnit) -> VirtualTimeUnit {
    return time + offset
  }

  /// 2つの仮想時間を比較する
  func compareVirtualTime(lhs: VirtualTimeUnit, _ rhs: VirtualTimeUnit) -> VirtualTimeComparison {
    if lhs < rhs { return .LessThan }
    if lhs > rhs { return .GreaterThan }
    return .Equal
  }
}

このコンバータを使って VirtualTimeScheduler を作成します。intialClock をゼロにしているので、こいつにとって現在は 1970/01/01 00:00:00 にタイムスリップしています。

// 現在時刻を 1970/01/01 00:00:00 にして初期化
let scheduler = VirtualTimeScheduler(initialClock: 0, converter: UnixTimeConverter())

// 処理のスケジュールを絶対時間で指定(2020/07/26 12:00:00に実行)
let disposable =  scheduler.scheduleAbsoluteVirtual((), time: 1595732400) { _ in
  // ...
  return NopDisposable.instance
}

// 今は1970年なので2020年の処理が実行されるわけない
scheduler.start()
// 2020/07/26 12:00:01 に時間を進める。登録した処理が実行されるはず。
scheduler.advanceTo(1595732401)

// 相対時間で10秒後に指定(2020/07/26 12:00:11に実行)
let disposable2 = scheduler.scheduleRelativeVirtual((), dueTime: 10) { _
  // ...
  return NopDisposable.instance
}

現実時間での相対時間指定の scheduleRelative メソッドもあります。もちろん即時実行の schedule メソッドも。

独自スケジューラ

全てのスケジューラは ImmediateSchedulerType プロトコルに準拠しています。

public protocol ImmediateSchedulerType {
    func schedule<StateType>(state: StateType, action: (StateType) -> Disposable) -> Disposable
}

即時実行だけでなく、実行する時間を指定できるスケジューラは SchedulerType プロトコルに準拠しています。

public protocol SchedulerType: ImmediateSchedulerType {

    var now : RxTime {
        get
    }

    func scheduleRelative<StateType>(state: StateType, dueTime: RxTimeInterval, action: (StateType) -> Disposable) -> Disposable

    func schedulePeriodic<StateType>(state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: (StateType) -> StateType) -> Disposable
}

これらに準拠するように実装することで、独自のスケジューラを作成することができます。observeOn, subscribeOn に指定するだけなら ImmediateSchdulerType に準拠するだけで十分ですが、Obserbable+Time に所属している時間指定可能なオペレータに指定するには、SchedulerType に準拠させる必要が有ります。

Observables

RxSwfit のソースコードでは、Observable の各オペレータは拡張として提供され、種類ごとにファイルが分けられています。

オペレータの多くは ObservableType の拡張インスタンスメソッドとして提供されていますが、中には拡張クラスメソッドとして提供されているものもあります。また合成系のオペレータは SequenceType や CollectionType の拡張インスタンスメソッドとして提供されているものもあります。

Observable+Aggregate

集計に関するものがここに所属しているようです。今の所2つだけですが。

オペレータ名 説明
reduce 集計を行う。配列の reduce と同じ。
toArray 完了までの全てのイベントを配列として渡す Observable に変換

reduce

配列の reduce と同様です。ReactiveX の説明はこちらです。

_ = Observable.of(1, 2, 3, 4, 5)
  .reduce(10) { total, value in total + value }
  .subscribeNext { total in
    // ...
  }

結果に map をかけるバージョンもあるようです。

_ = Observable.of(1, 2, 3, 4, 5)
  .reduce(10, accumulator: { total, value in total + value }) { "total = \(total)" }
  .subscribeNext { text in
    // ...
  }

toArray

onCompleted が発生するまでに流れてきたイベントを配列にして流す Observable に変換します。ReactiveX では to という名前で、Observable を他のデータ構造へ変換する役割になっています。

_ = Observable.of(1, 2, 3, 4, 5)
  .toArray()
  .subscribeNext { array in
    // arrayは[1, 2, 3, 4, 5]
    // ...
  }

Observable+Binding

Cold-Hot 変換に関係するものがここに所属しています。

オペレータ 説明
multicast 指定した Subject を使った ConnectableObservable を返す
publish PublishSubject を使った ConnectableObservable を返す
replay 指定数をキャッシュする ReplaySubject を使った ConenctableObservable を返す
replayAll 全てのイベントをキャッシュする ReplaySubject を使った ConnectableObservable を返す
refCount ConnectableObservable の connect を subscribe の数で参照カウント管理
share キャッシュしない Cold-Hot 変換 = publish + refCount
shareReplay キャッシュ付きの Cold-Hot 変換 = replay + refCount
shareReplayLatestWhileConnected shareReplay(1) の最適化バージョン

multicast

指定した Subject を使った ConnectableObservable を返します。publish や replay がこれを使って実装されています。直接利用することはないでしょう。

let connectableObservable = observable.multicast(BehaviorSubject())

何に使うのかよくわからない別バージョンが用意されています。

// 戻り値はconnect済みのConenctableObservableをObservable型で返す
let observable = observable.multicast({
  // 第一引数にどのSubjectを使うのかを返すクロージャを渡す
  return BehaviorSubject()
}) { observable in
  // 第二引数に生成されたConnectableObservableをObservableとして受け取って
  // Observableを返すクロージャを渡す
  return observable.map { "\($0)" }
}

publish

PublishSubject を使った ConnectableObservable を返します。ReactiveX の説明はこちらです。

let connectableObservable = observable.publish()

replay

指定数をキャッシュする ReplaySubject を使った ConenctableObservable を返します。ReactiveX の replay の説明にマーブルダイアグラムがあります。bufferSize に3を指定するとこの動作です。

let connectableObservable = observable.replay(3)

replayAll

全てのイベントをキャッシュする ReplaySubject を使った ConnectableObservable を返します。ReactiveX の replay の説明にあるマーブルダイアグラムを参考にしてください。

let connectableObservable = observable.replayAll

refCount

ConnectableObservalbe の connect と内部の Observable の dispose を参照カウントで管理します。

ReactiveX の RefCount の説明にマーブルダイアグラムがありますが、ちょっとRxSwift の実際の動作と違います。実際の動作は以下のようになっています。

refCountMarbleDiagram.png

share

publish().refCount() と同じ動作です。キャッシュしない Cold-Hot 変換に利用します。

let observable = publishSubject.map{ "\($0)" }.share()

shareReplay

replay(bufferSize).refCount() と同じ動作です。キャッシュする Cold-Hot 変換に利用します。

let observable = behaviorSubject.map{ "\($0)" }.shareReplay(1)

shareReplayLatestWhileConnected

shareReplay(1) の最適化バージョンです。引数が1なら shareReplay の代わりに使えます。中継するものが減る分、パフォーマンスが上がります。でもメソッド名長いぃぃぃぃ。

let observable = behaviorSubject.map{ "\($0)" }.shareReplayLatestWhileConnected()

Observable+Concurrency

非同期処理に関係する observeOn, subscribeOn が提供されています。

オペレータ 説明
observeOn イベントを受け取るスケジューラを指定
subscribeOn subscribe を実行するスケジューラを指定

observeOn

イベントを受け取るスケジューラを指定します。

let disposable = observable
  .observeOn(MainScheduler.instance)
  .subscribeNext { element in
    // ...
  }

ReactiveX の説明はこちらです。興味深いことが書いてあります。

例えば observeOn でメインスレッドを指定していて、メインスレッドで通知可能になる前に2つのイベントと onError が発生したとします。すると onError の前の2つのイベントは通知されずいきなり onError が通知されます。

subscribeOn

subscribe を実行するスケジューラを指定します。ReactiveX の subscribeOn の説明にマーブルダイアグラムがありますが、特に理解の役には立ちませんね。動作を理解するには RxSwift入門(2) 非同期処理してみるRxSwiftを深く理解する を参照してください。

let disposable = observable
  .subscribeOn(ConcurrentDispatchQueueScheduler(
    globalConcurrentQueueQOS: .UserInitiated))
  .observeOn(MainScheduler.instance)
  .subscribeNext { element in
    // ...
  }

Observable+Creation

Observable の生成に関係するものが所属しています。

オペレータ 説明
create 任意の処理を行う Observable を生成
empty onCompleted を発行するだけの Observable を生成
never onError, onCompleted も含めて一切のイベントを発生させない Observable を生成
just 指定した要素を1回通知して完了する Observable を生成
error onError を発行するだけの Observable を生成
of 引数で渡した要素が順に通知される Observable を生成
deferred subscribe されるたびに Observable を新しく生成
generate for文っぽい初期値と停止条件、生成処理を指定して Observable を生成
repeatElement 指定された要素を永遠に通知し続ける Observable を生成
using Observable と、それと同じ生存期間を持つリソースとを紐付け
range 指定範囲の値を順に流す Observable を生成
toObservable 配列などのシーケンス型を Observable に変換

create

任意の処理を行う Observable を生成します。ReactiveX の説明はこちらです。

let observable = Observable.create { observer in
  observer.onNext(1)
  observer.onNext(2)
  observer.onCompleted()
  return NopDisposable.instance
}

empty

onCompleted を発行するだけの Observable を返します。ReactiveX の説明はこちらです。

let observable = Observable.empty()

never

onError, onCompleted も含めて一切のイベントを発生させない Observable を返します。ReactiveX の説明はこちらです。

let observable = Observable.never()

just

指定した要素を onNext で1回通知して onCompleted する Observable を返します。ReactiveX の説明はこちらです。

let observable = Observable.just("Only one")

実行するスケジューラを指定できるバージョンもあります。

let observable = Observable.just("Only one", MainScheduler.instance)

// 以下と同じ動作だが経由するものが1つ減る
let observable = Observable.just("Only one").subscribeOn(MainScheduler.instance)

error

onError を発行するだけの Observable を返します。ReactiveX では throw です。Swift では throw は予約語なので使えません。

let observable = Observable.error(MyError.Unkown)

of

可変引数になっていて、引数で渡した要素が順に通知される Observable を生成します。ReavtiveX の from が近いです。

_ =  Observable.of(1, 2, 3, 4, 5).subscribeNext { value in
  // ...
}

// 上記は以下と同じ動作
[1, 2, 3, 4, 5].forEach { value in
  // ...
}

deferred

subscribe されるたびに Observable を新しく生成して渡します。ReactiveX では defer です。Swift では defer は予約語なので使えません。

let observable = Observable.deferred {
  // subscribeされるたびに呼び出されるので、Observableを生成して返す。
  return Observable.create { observer in
    // ...
  }
}

generate

for 文っぽい感じで初期値と停止条件を指定してイベントシーケンスを生成します。停止条件またはシーケンス生成のクロージャで例外を投げると onError が発生します。

// 1, 2, 4, 8, 16, 32, 64 が順に渡されるObservableを生成する
let observable = Observable.generate(initialState: 1, condition: { $0 < 100 }) { $0 * 2 }

// 生成する際のスケジューラも指定できます
let scheduler =  ConcurrentDispatchQueueScheduler(
  queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))
let observable = Observable.generate(initialState: 1, condition: { $0 < 100 }, scheduler: schduler) { $0 * 2 }

repeatElement

指定された要素を永遠に通知し続ける Observable を生成します。ReactiveX では repeat です。Swift では repeat は予約語なので使えません。

let observable = Observable.repeatElement(1)

// 生成するスケジューラも指定できます。
let scheduler =  ConcurrentDispatchQueueScheduler(
  queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))
let observable = Observable.repeatElement(1, scheduler: scheduler)

using

Observable と、それと同じ生存期間を持つリソースとを紐付けます。ReactiveX の説明はこちらです。

例えば処理中にインジケーターをグルグル回したい場合を考えてみます。onCompleted や onError が発生したらインジケーターの表示をやめます。あとは途中で dispose されてキャンセルされた場合もインジケーターの表示をやめる必要があります。

let indicator= Indicator()
inidicator.hidden = false

let disposable = search(params).subscribe(
  onNext: { result in
    // ...
  },
  onError: { error in
    indicator.hidden = true
    // ...
  },
  onCompleted: {
    indicator.hidden = true
  },
  onDisposed: {
    indicator.hidden = true
  } )

何気に subscribe には dispose されたときの処理が渡せます。

こういうのは using を使うと Observable にすることができます。第一引数はリソースを確保するクロージャで、第二引数が Observable を生成するクロージャです。Observable が生成されるタイミングであるリソースを確保し、Observable がいらなくなるタイミングでリソースを解放するわけです。

// Xcode7.2では第一引数のクロージャの戻り値の型を明示しないとエラーになった
let observable = Observable.using({ () -> AnonymousDisposable in
        // リソース確保を行う。リソースを解放するDisposableを戻り値で返す。
        let indicator = Indicator()
        indicator.hidden = false
        return AnonymousDisposable { indicator.hidden = true }
      }, observableFactory: { _ in
        // リソースと同じ生存期間のObservableを生成する。
        // リソース確保の戻り値で渡したDisposableがクロージャの引数に渡される。
        return search(params)
      })

これを subscribe すると、

  1. まずリソース確保が行われ
  2. 次に Observable が生成され
  3. その Observable を using が subscribe してイベントを転送する

が行われます。戻り値の Disposable を dispose すると、

  • 生成した Observable を using が内部で subscribe したときの Disposable
  • リソース解放のための Disposable

がともに dispose されます。

range

指定範囲の値を順に流す Observable を作成します。ReactiveX の説明はこちらです。

let observable = Observable.range(start: 1, count: 5)

// 生成する際のスケジューラも指定できます
let scheduler =  ConcurrentDispatchQueueScheduler(
  queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))
let observable = Observable.range(start: 1, count: 5, scheduler: scheduler)

SequenceType.toObservable

SequenceType を Observable に変換する toObservable メソッドが拡張されています。ReavtiveX の from に相当します。

内部で SequenceType を Array に変換しています。Array には SequenceType を引数に取るコンストラクタがあります。当たり前ですが Dictionary や Set などの順序が関係ないデータ型の場合は、通知される順番は保証されません。

_ = [ "name": "Hanako", "age": "20", "sex": "female" ].toObservable()
  .subscribeNext { print("\($0)") }

以下のように表示されます。

("age", "20")
("sex", "female")
("name", "Hanako")

Array.toObservable

SequenceType.toObservable は内部で Array に変換しますが、 Array の場合は変換が必要ないので特殊化されています。

let observable = [ 1, 2, 3, 4, 5 ].toObservable()

Observable+Debug

デバッグ用のオペレータがここに所属しています。今の所 debug だけです。

オペレータ 説明
debug 流れてくるイベントを標準出力に print

debug

流れてくるイベントを標準出力に print します。幾つか並んだオペレータの途中経過が知りたいときに便利です。Cold-Hot 変換されているかも簡単に確認できます。doOn 使って自分でログ出力もできますが、これ使う方が楽チンです。

_ = Observable.of(1, 2, 3, 4, 5)
  .map { $0 * 2 }
  .debug()
  .filter { $0 % 4 == 0 }
  .subscribeNext {  print("value = \($0)") }
2016-07-28 19:29:19.891: MainViewController.swift:81 (viewDidAppear) -> subscribed
2016-07-28 19:29:19.896: MainViewController.swift:81 (viewDidAppear) -> Event Next(2)
2016-07-28 19:29:19.897: MainViewController.swift:81 (viewDidAppear) -> Event Next(4)
value = 4
2016-07-28 19:29:19.897: MainViewController.swift:81 (viewDidAppear) -> Event Next(6)
2016-07-28 19:29:19.898: MainViewController.swift:81 (viewDidAppear) -> Event Next(8)
value = 8
2016-07-28 19:29:19.898: MainViewController.swift:81 (viewDidAppear) -> Event Next(10)
2016-07-28 19:29:19.898: MainViewController.swift:81 (viewDidAppear) -> Event Completed
2016-07-28 19:29:19.898: MainViewController.swift:81 (viewDidAppear) -> disposed

第一引数の identifier を指定すると、ファイル名+行数+関数名だったところが identifier になります。

_ = Observable.of(1, 2, 3, 4, 5)
  .map { $0 * 2 }
  .debug("map後の値")
  .filter { $0 % 4 == 0 }
  .subscribeNext {  print("value = \($0)") }
2016-07-28 19:46:04.171: map後の値 -> subscribed
2016-07-28 19:46:04.183: map後の値 -> Event Next(2)
2016-07-28 19:46:04.183: map後の値 -> Event Next(4)
value = 4
2016-07-28 19:46:04.184: map後の値 -> Event Next(6)
2016-07-28 19:46:04.184: map後の値 -> Event Next(8)
value = 8
2016-07-28 19:46:04.184: map後の値 -> Event Next(10)
2016-07-28 19:46:04.184: map後の値 -> Event Completed
2016-07-28 19:46:04.184: map後の値 -> disposed

Observable+Multiple

複数の Observable を結合するタイプがここに所属しています。

オペレータ 説明
combineLatest 複数の Observable の最新値同士を組み合わせる
zip 複数の Observable の内容を順番に組み合わせる
switchLatest 次の Observable が来たらそちらにスイッチする
concat 複数の Observable を完了するまで待って順次 subscribe する
merge 複数の Observable を subscribe して、全てのイベントをマージして通知
catchError エラーを補足して復旧
catchErrorJustReturn エラーを捕捉して復旧し、引数に与えた値を代わりに通知
takeUntil 指定した Observable が何かイベントを発行すると終了
skipUntil 指定した Observable が onNext を発行するまで、元の Observable が発行したイベントを無視
amb 複数の Observable のうち、最初にイベントを発行した Observable を採用
withLatestFrom 元の Observable に指定した Observable の最新値を合成

combineLatest

複数の Observable の最新値同士を組み合わせます。ReavtiveX の説明はこちらです。

CollectionType の拡張メソッドとして提供されています。

let newObservable = [observable1, observable2].combineLatest { ($0[0], $0[1]) }

また合成するものが8個までなら、Observable.combineLatest(...) も利用できます。こちらは Observables/Implementations に格納されていますが、public として公開されています。

let newObservable = Observable.combineLatest(observable1, observable2) { ($0, $1) }

zip

複数の Observable を順番に組み合わせます。ReavtiveX の説明はこちらです。

CollectionType の拡張メソッドとして提供されています。

let observable1 = Observable.of(1, 2, 3, 4, 5)
let observable2 = Observalbe.of("A", "B", "C", "D")
let newObservable = [observable1, observable2].zip { "\($0[0])\($0[1])" }
// "1A", "2B", "3C", "4D" を通知する

また合成するものが8個までなら、Observable.zip(...) も利用できます。こちらは Observables/Implementations に格納されていますが、public として公開されています。

let observable1 = Observable.of(1, 2, 3, 4, 5)
let observable2 = Observalbe.of("A", "B", "C", "D")
let newObservable = Observable.zip(observable1, observable2) { "\($0)\($1)" }
// "1A", "2B", "3C", "4D" を通知する

switchLatest

ReavtiveX では switch です。Swift では switch は予約語なので使えません。

ちょっとややこしいですが、Observable の通知するイベントの型は自由なので、イベントとして Observable を通知する Observable というのも作れます。

switchLatest は Observable が来ると前のを dispose して次のを subscribe します。新しい Observable が来るたびにそちらにスイッチしていくのでこの名前になっています。

何に使うかというと、例えば何か入力(クリックとか文字入力とか)されるたびに非同期処理が走るような場合に、次の処理が開始したら前の処理はキャンセルしたいってことがあります。

// テキストボックスの内容が変更されるたびにインクリメンタルサーチ
let observable = text.rx_text.asObservable()
  .map { text in search(text) } // searchは非同期で検索処理を行うObservableを返す
  .switchLatest()
  .observeOn(MainScheduler.instance)
  .subscribeNext { searchResult in
    // ...
  }

上の例ではインクリメンタルサーチを行っています。UITextView の text が変更されるたびに検索処理を実行し、次の検索処理が開始すると前の処理は dispose されてキャンセルされます。そうして検索が終了したものだけ受け取ります。

map + switchLatest は flatMapLatest 1つで書けますのでそちらも参照してください。

concat

前の Observable が onCompleted してから次の Observable を subscribe します。ReactiveX の説明はこちらです。

Cold な Observable の場合は名前の通り前後に結合する動作をしますが、Hot な Observable の場合は単に1つ目が完了したら2つ目に切り替わるだけです。

let observable3 = observable1.concat(observable2)

// 上は以下と同じ。この方法だと3つ以上をconcatすることもできる。
let observable4 = [observable1, observable2].concat()

// こう書いてもいい。この方法でも3つ以上をconcatできる。
let observable5 = Observable.of(observable1, observable2).concat()

of は Observable を順に通知する Observable を作っています。つまり Observable を通知する Observable に対しても適用できるわけです。非同期の Observable を通知する Observable に対して用いると、順次実行させることができます。

let disposable = event
  .map { doSomething($0) }  // doSomething は非同期で何かを行うObservableを返す
  .concat()  // 1つずつ完了するまで待って次の処理を実行する
  .subscribeNext { result in
    // ...
  }

concat.png

キューイングせずに「処理中にきた要求は無視する」動作にしたい場合は、flatMapFirst を利用できますのでそちらを参照してください。

merge

複数の Observable を subscribe して、全てのイベントを通知します。ReactiveX の説明はこちらです。

以下は2つのエラーを通知する Observable をまとめて1つにしています。

let errorEvent = Observable.of(hoge.errorEvent, fuga.errorEvent).merge()

of で Observable を順に通知する Observable を作っています。merge は Observable がやってくるとそれを subscribe して、全てのイベントをまとめて通知します。それが分かっていると以下の使い方も分かってきます(この場合は map + merge を flatMap 1つで書けますのでそちらも参照してください)。

let disposable = event
  .map { doSomething($0) }  // doSomething は非同期で何かを行うObservableを返す
  .merge()  // 次々並列に実行する
  .subscribeNext { result in
    // ...
  }

flatMap.png

concat ではシリアル実行でしたが、merge を使うと並列実行することができます。そして merge は並列に実行する最大数を引数に指定することができます。

let disposable = event
  .map { doSomething($0) }  // doSomething は非同期で何かを行うObservableを返す
  .merge(maxConcurrent: 3)  // 最大同時実行数3で並列実行する
  .subscribeNext { result in
    // ...
  }

引数の maxConcurrent に1を指定すると concat と同じ動作になります。

catchError

ReactiveX では catch です。Swift では catch は予約語なので使えません。

onError を捕捉して復旧するのに利用します。復旧後には指定した Observable の内容が通知されます。

let disposable = search(params)
  .catchError { error in
    if error is MyError { return Observable.error(error) }
    return Observable.just([])
  } 
  .obserbeOn(MainScheduler.instance)
  .subscribeNext { result in
    // ...
  }

SequenceType にも catchError メソッドが拡張されています。どうも onError が発生しても無視して、どれかが成功するまで処理を続けるようです。

let disposable = [observable1, observable2, observable3].catchError()
  .subscribeNext { result in
    // ...
  }

catchErrorJustReturn

catchError と同様にエラーを捕捉して復旧します。引数に与えた値が代わりに通知されます。

let disposable = search(params)
  .catchErrorJustReturn([]) // エラーなら空配列に置き換える
  .obserbeOn(MainScheduler.instance)
  .subscribeNext { result in
    // ...
  }

takeUntil

指定した Observable が何かイベントを発行すると終了します。指定した Observable がイベントを発行するまでを採用するので takeUntil という名前になっています。ReactiveX の説明はこちらです。

指定した Observable が onError を発行した場合は onError で終了します。また指定した Observable がイベントを発行する前に元の Observable が(onError / onComplete で)終了した場合も終了します。

let newObservable = observable.takeUntil(stopperObservable)

skipUntil

指定した Observable が onNext を発行するまで、元の Observable が発行したイベントを無視します。ReactiveX の説明はこちらです。

指定した Observable が onError を発行した場合は onError を発行し、onCompleted を発行した場合は何も発行しません。指定した Observable が onNext を発行する前に元の Observable で onError が発生した場合は onError が通知されます。

let newObservable = observable.skipUntil(starterObservable)

amb

複数の Observable のうち、最初にイベントを発行した Observable を採用します。よーいどんでスタートして、最初に onNext を発行した奴が一人勝ちです。ReactiveX の説明はこちらです。

let newObservable = observable1.amb(observable2)

3つ以上の場合は SequenceType の拡張メソッドを使います。

let newObservable = [observable1, observable2, observable3].amb()

採用されなかったものは dispose されるので、処理をキャンセルすることができます。

withLatestFrom

元の Observable に指定した Observable の最新値を合成します。ReactiveX には該当するものがありません。マーブルダイアグラムは RxMarbles の withLatestFrom を参照してください。

let newObservable = mainObservable
  .withLatestFrom(subObservable) { main, sub in (main, sub) }

第二引数のクロージャを省略すると、値として引数に渡した Observable のものを採用します。

let newObservable = triggerObservable.withLatestFrom(valueObservable)

// 上記は以下と同じ
let newObservable = triggerObservable.withLatestFrom(valueObservable) { $1 }

元の Observable をトリガーにして指定した Observable の最新値を取得できます。sample と似ていますが、sample は値に変化がないと通知をスキップします。トリガー発生時に必ず値を通知して欲しい場合はこちらを使ってください。

Observable+Single

どういう基準でここに所属しているのか不明です。

オペレータ 説明
distinctUntilChanged 変化がない間はイベントをスキップ
doOn パイプラインの途中に処理を挟み、イベントはそのまま通過させる
startWith 先頭に指定した値を発行するイベントを付け加える
retry エラーが発生したら再試行
retryWhen retry のもっと細かい制御ができる汎用バージョン
scan reduce の途中経過もイベント発行するバージョン

distinctUntilChanged

ReactiveX に distinct というのがありますが、それとは違っています。マーブルダイアグラムは RxMarbles の distinctUntilChanged を参照してください。

一番単純なのは、直近の値と同じものをスキップするバージョンです。つまり変化があるまでイベントを発行しません。名前の通りですね。

let newObservable = observable.distinctUntilChanged()

distinctUntilChanged は2つの引数をとるカスタマイズ可能な作りになっています。第一引数の keySelector は元の Observable が発行するイベントの値を比較するための値に変換する処理で、第二引数の comparer は比較処理です。

let newObservable = observable
  .distinctUntilChanged({ Int(round($0)) }) { currentKey, key,  in currentKey -  key  < 10 }

上の例では浮動小数点数で通知される値を四捨五入して整数にし、その差が10未満の間はイベントをスキップするようにしています。

keySelector のデフォルト動作は「変換しない」です。なので comparer だけを渡すバージョンが用意されています。

let newObservable = observable
  .distinctUntilChanged { currentKey, key,  in currentKey -  key  < 10.0 }

comparer のデフォルト動作は「等しいかどうか」です。なので keySelector だけを指定するバージョンもあります。

let newObservable = observable
  .distinctUntilChanged { Int(round($0)) }

doOn

オペレータの数珠繋ぎパイプラインの途中に処理を挟みますが、後続へはそのままイベントを通過させます。ただし例外が発生すると onError を通知します。ReactiveX では Do です。

基本的にオペレータの処理で本来の役割ではない以下の

  1. 渡されるイベント以外に外部変数の値を利用する
  2. 外部の値を変更する

のはやるべきではありません。後者のいわゆる副作用を記述するには doOn を使いましょう。(ちなみに前者をやりたければそれを Observable 化して合成すればいいんです)

let newObservable = observable
  .map { $0 * 2 }
  .doOn { event in
    switch event {
    case .Next(value):
      // ...
    case .Error(error):
      // ...
    case .Completed:
      // ...
    }
  }
  .map { "\($0)" }

次のようにそれぞれにクロージャを渡す形でも書けます。また必要ない引数は省略できます。

let newObservable = observable
  .map { $0 * 2 }
  .doOn(
    onNext: { value in
      // ...
    },
    onError: { error in
      // ...
    },
    onCompleted: {
      // ...
    })
  }
  .map { "\($0)" }

onNext, onError, onCompleted のどれかだけで処理をするなら、専用の doOnNext, doOnError, doOnCompleted が用意されています。

また単にデバッグのために値を標準出力に表示したいだけなら、debug オペレータが利用できます。

startWith

先頭に指定した値を発行するイベントを付け加えます。ReactiveX の説明はこちらです。

let newObserfable = observable.startWith(1)

// 可変個引数になっているので複数指定可能
let newObservable = observable.startWith(1, 4, 8)

retry

エラーが発生した場合に再試行します。ReactiveX の説明はこちらです。

onError が発生すると subscribe し直します。

// 成功するまでリトライし続ける
let disposable = observable
  .retry()
  .subscribeNext { result in
    // ...
  }

// 3回までリトライする
let disposable = observable
  .retry(3)
  .subscribe(
    onNext: { result in
      // ...
    },
    onError: { error in
    })

retryWhen

retry よりもっと細かい制御ができます。例えば3秒後にリトライしたい場合は以下のように書きます。

let scheduler =  ConcurrentDispatchQueueScheduler(
  queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))
let disposable = observable
  .retryWhen { (errors: Observable<ErrorType>) -> Observable<Int> in
    errors.flatMap { _ in Observable.timer(3, scheduler: scheduler) }
  }
  .subscribeNext { result in
    // ...
  }

この retryWhen の使い方はちょっと複雑です。引数に渡しているクロージャには発生したエラーを通知する Observable が渡されます。エラーが渡されるんじゃないんです。Observable です。ここが分かりにくい。

クロージャの戻り値として返すのも Observable でこれが onNext を発行するとリトライします。この例では timer を使って3秒待ってから onNext を発行しています。

flatMap を map にするとコンパイルエラーになります。

  .retryWhen { (errors: Observable<ErrorType>) -> Observable<Int> in
    // map ではダメ
    errors.map { _ in Observable.timer(3, scheduler: scheduler) }
  }

erros.map だと返す型が Observable<Observable<Int>> です。欲しいのは Observable<Int> なので3、flatMap で Observable を1段展開する必要があります。

次はエラーの種類によってリトライするかどうか判断してみましょう。

let disposable = observable
   .retryWhen { (errors: Observable<ErrorType>) -> Observable<Int> in
     errors.flatMap { (error: ErrorType) -> Observable<Int>  in
       if error is ApplicationError { return Observable.just(0) } // リトライ
       return Observable.error(error) // エラー
     }
   }
   .subscribeNext { result in
     NSLog("\(result)")
   }

使い方が分かれば、リトライ条件やリトライまでの時間を制御できるようになります。

scan

reduce に似ていますが、途中経過もイベントとして発行します。ReactiveX の説明はこちらです。

以下はそれまでの総和をイベントとして通知します。

let observable = Observable.generate(initialState: 0, condition: { $0 < 10 }) { $0 + 1 }
  .scan(0) { $0 + $1 }
  .subscribeNext { print("\($0)") }
0
1
3
6
10
15
21
28
36
45

scan の初期値と通知する型は、元の Observable と同じである必要はありません。以下は元の Observable は Int を通知しますが、scan は String を通知します。

_ = Observable.generate(initialState: 0, condition: { $0 < 10 }) { $0 + 1 }
  .scan("sequence =") { $0 + " \($1)" }
  .subscribeNext { print("\($0)") }
sequence = 0
sequence = 0 1
sequence = 0 1 2
sequence = 0 1 2 3
sequence = 0 1 2 3 4
sequence = 0 1 2 3 4 5
sequence = 0 1 2 3 4 5 6
sequence = 0 1 2 3 4 5 6 7
sequence = 0 1 2 3 4 5 6 7 8
sequence = 0 1 2 3 4 5 6 7 8 9

Observable+StandardSequenceOperators

シーケンスに対して変換したり一部を取り出したりする標準的なオペレータがここに所属しています。

オペレータ 説明
filter 指定条件に合致するものだけ通過
takeWhile 先頭から指定条件に合致している間だけイベントを通知
takeWhileWithIndex takeWhile のインデックス付きバージョン
take 先頭から指定個数だけ通知して完了
takeLast 最後から指定した数を通知して完了
skip 先頭から指定した数だけイベントをスキップ
skipWhile 先頭から指定条件に合致している間だけイベントをスキップ
skipWhileWithIndex skipWhile のインデックス付きバージョン
map 各要素の内容を指定した処理で別の値や型に変換します。
mapWithIndex map のインデックス付きバージョン
flatMap イベントを Observable に変換した上で、 それを並列実行して発行するイベントをマージ( map + merge )。
flatMapWithIndex flatMap のインデックス付きバージョン
flatMapFirst イベントを Observable に変換して順次実行するが、実行中のものが完了するまでに来た Observable は無視
flatMapLatest イベントを Observable に変換し、常に最新の Observable を利用( map + switchLatest )。
elementAt 指定インデックス番号のイベントを通知して完了
single 1つだけしかイベントを発行しないことを保証

filter

指定条件に合致するものだけを通過させます。ReactiveX の説明はこちらです。

// 偶数だけを通す
let newObservable = Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).filter { $0 % 2 == 0 }
// newObservable は 2, 4, 6, 8, 10 を通知する

takeWhile

先頭から指定条件に合致している間だけイベントを通知します。指定条件に合致しないイベントが来たら、それは通知せずに onCompleted に置き換えられます。ReactiveX の説明はこちらです。

let newObservable = Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).takeWhile { $0 < 5 }
// newObservable は 1, 2, 3, 4 を通知して onCompleted になる

takeWhileWithIndex

takeWhile のインデックス付きバージョンです。

let newObservable = Observable.of(1, 3, 5, 8, 9, 6, 7, 2, 4, 10)
  .takeWhileWithIndex { event, index in index < 5 && event < 10 }
// 5番目以下は全て10未満の数なので 1, 3, 5, 8, 9 を通知して onCompleted になる

take

先頭から指定個数だけ通知して完了します。ReactiveX の説明はこちらです。

let observable = Observable.of(1, 2, 3, 4).take(2)
// 1, 2 を通知して onCompleted になる

時間で指定するバージョンもあります。Observable+Time の章の take を参照してください。

takeLast

最後から指定した数を通知して完了します。最後が確定するために元の Observable が完了する必要があります。元の Observable が指定数未満しかイベントを発行しなかった場合、その全てのイベントが通知されます。ReactiveX の説明はこちらです。

let observable = Observable.of(1, 2, 3, 4).takeLast(1)
// 4 を通知して onCompleted になる

skip

先頭から指定した数だけイベントを無視してスキップします。ReactiveX の説明はこちらです。

let observable = Observable.of(1, 2, 3, 4).skip(2)
// 3, 4 を通知する

時間で指定するバージョンもあります。Observable+Time の章の skip を参照してください。

skipWhile

先頭から指定条件に合致している間のイベントを無視してスキップします。ReactiveX の説明はこちらです。

let newObservable = Observable.of(1, 2, 3, 4, 5, 1, 2, 3, 4, 5).skipWhile { $0 < 5 }
// 5, 1, 2, 3, 4, 5 を通知する

skipWhileWithIndex

skipWhile のインデックス付きバージョンです。

let newObservable = Observable.of(1, 3, 5, 8, 9, 6, 7, 2, 4, 10)
  .skipWhileWithIndex { event, index in index < 4 && event < 10 }
// 5番目以下は全て10未満の数なのでスキップして 9, 6, 7, 2, 4, 10 を通知する

map

各要素の内容を指定した処理で別の値や型に変換します。ReactiveX の説明はこちらです。

let newObservable = Observable.of(1, 2, 3)
  .map { $0 * 10 }
// 10, 20 ,30 が通知されます。

mapWithIndex

map のインデックス付きバージョンです。

let newObservable = Observable.of(1, 2, 3)
  .mapWithIndex { value, index in index + value }
// 1, 3, 5 を通知する

flatMap

元の Observable のイベントを Observable に変換した上で、その発行するイベントをマージします。 ReactiveX の説明はこちらです。

慣れてないとわかりにくいので、まずは意味のない単純なものからやってみましょう。

let newObservable = Observable.of(1, 2, 3)
  .flatMap { value in Observable.just(value) }
// 1, 2, 3 が通知される

これは

  1. 3つの Int 型 1, 2, 3 を通知する Observable<Int> がある
  2. Observable.just(1), Observable.just(2), Observable.just(3) という3つの Observable<Int> を通知する Observable<Observable<Int>> に変換
  3. それらの Obsevable の通知内容をマージした Observable<Int> に変換

ということをやっています。結局元に戻ってるので何の意味もありませんが。つまるところ flatMap は map + merge の処理をしているわけです。merge のところでも紹介した以下のマーブルダイアログを参考にしてください。

flatMap.png

merge のところで非同期処理を並列実行できると説明しました。以下のコードです。

let disposable = event
  .map { doSomething($0) }  // doSomething は非同期で何かを行うObservableを返す
  .merge()  // 次々並列に実行する
  .subscribeNext { result in
    // ...
  }

これは flatMap を使った方がスマートに書けます。

let disposable = event
  .flatMap { doSomething($0) }  // doSomething は非同期で何かを行うObservableを返す
  .subscribeNext { result in
    // ...
  }

また retryWhen の例でも flatMap を使っているので参考にしてください。

flatMapWithIndex

flatMap のインデックス付きバージョンです。以下の例では初回だけ別の処理を実行するようにしています。

let disposable = event
  .flatMapWithIndex { value, index in
    if index == 0 {
      return doFirst($0) // doFirst は非同期で何かを行うObservableを返す
    }
    return doSomething($0) // doSomething は非同期で何かを行うObservableを返す
  }
  .subscribeNext { result in
    // ...
  }

mapWithIndex + merge と同じ動作ですね。

flatMapFirst

これはちょっと名前から動作がイメージしにくいです。flatMap と同様にイベントを Observable に変換しますが、通知に利用する Observable は常に1つだけです。そして subscribe 中の Observable が完了するまでの間に通知された Observable は無視されます。完了すると完了後に通知された Observable を subscribe します。

以下は concat で説明した非同期処理を順次実行するコードです。

let disposable = event
  .map { doSomething($0) }  // doSomething は非同期で何かを行うObservableを返す
  .concat()  // 1つずつ完了するまで待って次の処理を実行する
  .subscribeNext { result in
    // ...
  }

concat.png

これを flatMapFirst を使って以下のように書くと、map + concat と同様に1つずつしか実行しませんが、実行中の処理がある場合は新しく来た Observable を無視します。そのため処理待ちのものがキューに溜まり続けることはありません。

let disposable = event
  .flatMapFirst { doSomething($0) }  // doSomething は非同期で何かを行うObservableを返す
  .subscribeNext { result in
    // ...
  }

flatMapFirst.png

よく「処理中フラグを設けて、処理中だったら処理は開始しない」みたいなことしませんか?それをフラグを設けずに実装できます。

flatMapLatest

これも flatMapFirst と同じくイベントを Observable に変換し、常に1つの Observable のイベントしか通知しないのですが、次の Observable が来るとそちらにスイッチします。

  1. Observable に変換
  2. 常に一番新しい Observable を実行

という動作です。つまり flatMapLatest は map + switchLatest と同じ動作です。

以下は switchLatest で紹介したコードです。map で Observable の Observable に変換してから switchLatest で常に最新の Observable に切り替えます。

// テキストボックスの内容が変更されるたびにインクリメンタルサーチ
let observable = text.rx_text.asObservable()
  .map { text in search(text) } // searchは非同期で検索処理を行うObservableを返す
  .switchLatest()
  .observeOn(MainScheduler.instance)
  .subscribeNext { searchResult in
    // ...
  }

これは flatMapLatest を使って以下のように書けます。

// テキストボックスの内容が変更されるたびにインクリメンタルサーチ
let observable = text.rx_text.asObservable()
  .flatMapLatest { text in search(text) } // searchは非同期で検索処理を行うObservableを返す
  .observeOn(MainScheduler.instance)
  .subscribeNext { searchResult in
    // ...
  }

elementAt

指定インデックス番号のイベントを通知して完了します。ReactiveX の説明はこちらです。

let newObservable = Observable.of(1, 2, 3, 4)
  .elementAt(2)
// 3 が通知されてonCompletedになる

single

元の Observable が1つだけしかイベントを発行しないことを保証させます。何もイベントを発行せずに完了したり、2つ以上のイベント発行した場合はエラーになります。ReactiveX の first に似ていますが、これは元の Observable の発行するイベントが1つでなくてもエラーにはなりません。

// これはエラーにならずに1が通知される
let observable1 = Observable.just(1).single()

// これはイベントが発行されずに 完了するのでエラーになる
let observable2 = Observable.empty().single()

// これもイベントが2つなのでエラーになる
let observable3 = Observable.of(1, 2).single()

対象のイベントを指定するクロージャを渡すこともできます。この場合 filter + single と思っていいです。

// これはエラーにならずに2が通知される
let observable1 = Observable.of(1, 2, 3).single { $0 % 2 == 0 }

// これはイベントが2つなのでエラーになる
let observable2 = Observable.of(1, 2, 3, 4).single { $0 % 2 == 0 }

Observable+Time

時間に関係するオペレータがここに所属しています。内部のタイマー動作を行うスケジューラを指定して利用するものが多いです。

オペレータ 説明
debounce 指定期間新たなイベントが発行されなくなってから最後に発行されたイベントを発行
throttle 指定期間の計測が開始された最初のイベントと、指定期間新たなイベントが発行されなくなってから最後に発行されたイベントを発行
sample 引数に渡した Observable の発行タイミングで元の Observable の最新値を通知
interval 指定期間ごとにイベントを発生
timer 指定時間後にイベントを発生させて完了
take 先頭から指定時間の間のイベントだけ通知して完了
skip 先頭から指定時間の間のイベントだけ無視してスキップ
ignoreElements 全てのイベントを無視して完了やエラーのみ通知
delaySubscription subscribe するまでの時間を遅らせる
buffer 最大指定時間または最大指定個数ごとにイベントを配列にまとめて通知
window 最大指定時間または最大指定個数ごとにイベントを Observable にまとめて通知
timeout 指定時間イベントが通知されなければエラー

debounce

指定期間新たなイベントが発行されなくなってから最後に発行されたイベントを発行します。その途中のイベントは無視されます。
ReactiveX では debounce です。

例えばユーザーが操作中はイベント処理をせず、一定期間ユーザー操作がなかったら最後の値を採用したい・・・みたいなときに使います。

let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)

let observable: Observable<Int> = Observable.create { observer in
    observer.onNext(0)
    observer.onNext(1)
    observer.onNext(2)
    sleep(2)
    observer.onNext(3)
    observer.onNext(4)
    sleep(2)
    observer.onCompleted()
    return Disposables.create()
  }
_ = observable
  .debounce(1, scheduler: scheduler) // 2, 4
  .subscribe(onNext: { print("value = \($0)") })
value = 2
value = 4

throttle

指定期間の計測が開始された最初のイベントと、指定期間新たなイベントが発行されなくなってから最後に発行されたイベントを発行します。その途中のイベントは無視されます。
debounceとの違いは、指定期間の計測が開始された最初のイベントも発行されるところです。

let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)

let observable: Observable<Int> = Observable.create { observer in
    observer.onNext(0)
    observer.onNext(1)
    observer.onNext(2)
    sleep(2)
    observer.onNext(3)
    observer.onNext(4)
    sleep(2)
    observer.onCompleted()
    return Disposables.create()
  }
_ = observable
  .throttle(1, scheduler: scheduler)
  .subscribe(onNext: { print("value = \($0)") })
value = 0
value = 2
value = 3
value = 4

sample

引数に渡した Observable の発行タイミングで元の Observable の最新値を通知します。変化がない場合は通知されません。ReactiveX の説明はこちらです。

以下は通知を1秒に一回(同じ値なら通知しない)に制限しています。

let scheduler =  ConcurrentDispatchQueueScheduler(
  queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))
let newObservable = observable.sample(Observable.interval(1, scheduler: scheduler))

interval

指定期間ごとにイベントを発生させます。ReactiveX の説明はこちらです。

通知される値はインデックス番号ですが使うことはないでしょう。無限に発生させ続けるのでそのうちオーバーフローしてマイナスからカウントします。

以下は1秒に1回イベントを発生させます。

let scheduler =  ConcurrentDispatchQueueScheduler(
  queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))
let observable = Observable<Int>.interval(1, scheduler: scheduler)

timer

指定時間後にイベントを発生させて完了します。ReactiveX の説明はこちらです。

以下は1秒後にイベントを発生させて完了する Observable を作成しています。retryWhen の説明で利用しているのでそちらも参照してください。

let scheduler =  ConcurrentDispatchQueueScheduler(
  queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))
let observable = Observable<Int>.timer(1, scheduler: scheduler)

第二引数に period を指定することもできます。nil を指定すると上と同じ動作になります。

// 最初のイベントまで1秒、そのあとは2秒に一回イベントが発生
let observable = Observable<Int>.timer(1, period: 2, scheduler: scheduler)

第一引数と第二引数に同じ値を指定すると interval と同じ動作になります。

take

指定時間の間のイベントだけ通知して完了します。数を指定するバージョンと同じ名前ですが、こちらは時間指定です。

let scheduler =  ConcurrentDispatchQueueScheduler(
  queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))
// 3秒間だけイベントを採用する
let newObservable = observable.take(3, scheduler: schduler)

skip

指定時間の間のイベントだけ無視してスキップします。数を指定するバージョンと同じ名前ですが、こちらは時間指定です。

let scheduler =  ConcurrentDispatchQueueScheduler(
  queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))
// 3秒間だけイベントを無視する
let newObservable = observable.skip(3, scheduler: schduler)

ignoreElements

全てのイベントを無視して完了やエラーのみ通知します。つまり onNext は呼ばれず onError か onCompleted しか呼ばれないことを保証します。ReactiveX の説明はこちらです。

let observable = Observable.of(1, 2, 3, 4).ignoreElements()
// onCompletedだけが通知される

delaySubscription

subscribe するまでの時間を遅らせます。

let scheduler =  ConcurrentDispatchQueueScheduler(
  queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))
// 3秒後から購読開始
let disposable = observable
  .delaySubscription(3, scheduler: scheduler)
  .subscribeNext { element in
    // ...
  }

buffer

最大指定時間または最大指定個数ごとにイベントを配列にまとめて通知します。ReactiveX の説明はこちらです。

第一引数が最大時間、第二引数が最大個数です。どちらかに達すると、次のバッファに移ります。

// ボタンのダブルタップを検知する。0.5秒以内に2回タップされたらダブルタップと判定。
let disposable = button.rx_tap.asObservable()
  .buffer(timeSpan: 0.5, count: 2, scheduler: scheduler)
  .filter { $0.count == 2 }
  .subscribeNext { _ in
    // ...
  }

window

buffer は配列にまとめますが、window は Observable にまとめます。ReactiveX の説明はこちらです。

timeout

指定時間イベントが通知されなければエラーにします。タイムアウト時には onError で RxError.Timeout が通知されます。ReactiveX の説明はこちらです。

let disposable = observable
  .timeout(30, scheduler: scheduler)
  .subscribe(
    onNext: { result in
      // ...
    },
    onError: { error in
      // ...
    },
  )

  1. 執筆時の最新バージョンです。でも利用しているのは2.3.0ですので、動作確認したものはそちら+Xcode7.2(Swift2.2)での確認になってます。 

  2. ソースコードのコメントにそう記載されています。 

  3. 実際にはtimerが返すのはObservable<SingedIntegerType>です。Int以外にInt8,Int16,Int32,Int64などのビット数指定のものもSingedIntergerTypeです。値としては0が通知されるだけですが。