はじめに
「オブザーバーパターンから始めるRxSwift入門」の続きです。
前の記事では Rx の Hot な Observable のみを使った利用方法を説明しました。今回は Cold な Observable の生成方法や、非同期処理を実現するためのスレッド指定方法などを解説します。
HotなObservableって何?
前の記事で説明した Subject を使ったものが Hot な Observable です。Subject そのものでなくても、例えば Variable のように内部で Subject を使っているものもあります。
- どこからも subscribe されていなくても動作する
- 複数から subscribe されると同じ動作結果を通知する
という特徴があります。オブサーバーパターンを置き換えるなら、この動作は当たり前ですよね?
上図の PublishSubject はどこからも subscribe されていなくても動作していてイベントを発行しています。それを subscribe すると、その時点以降のイベントを通知してもらえるようになります。複数から subscribe しても、同じタイミングで同じイベントが通知されます。
ColdなObservableって何?
逆に Subject を使わないで作られる Observable は全て Cold です。Rx は Cold の方が基本で、むしろ Hot の方が特殊と言えます。
- subscribe されないと動作せず、subscribe されると動作を開始する
- 複数から subscribe されるとそれぞれ別々に動作する
という特徴があります。なんのこっちゃと思うかもしれませんが、例えば配列の forEach を呼び出す場合を考えてみてください。
[ 1, 2, 3, 4, 5].forEach { value in
// ...
}
forEach を呼び出すと配列の最初から値を渡してきます。しかも複数が forEach を呼び出すとそれぞれ別々に動作して最初から値を渡してきます。
RxSwift で同じ動作をさせるには以下のようにします。
_ = Observable.of(1, 2, 3, 4, 5).subscribeNext { value in
// ...
}
Observable.of(1, 2, 3, 4, 5) で作っているのが Cold な Observable になります。subscribe するまでは動作せず、複数から subsribe されるとそれぞれ別々に動作して、1, 2, 3, 4, 5 を順番に渡します。
HotなObservableを使って非同期通信結果を通知してみる
前の記事で紹介した Subject を使う方法で、サーバーからデータを取得するクラスを作ってみます。
class ServerDataLoader {
private let resultSubject = PublishSubject<NSData>()
private let request: NSURLRequest
private var task: NSURLSessionDataTask?
var result: Observable<NSData> { return resultSubject }
func start() {
let configuration = NSURLSessionConfiguration.defaultSessionConfiguration()
let session = NSURLSession(configuration: configuration)
let task = session.dataTaskWithRequest(request) { // 完了時の処理
[resultSubject] data, response, error in
dispatch_async(dispatch_get_main_queue()) {
if let data = data {
resultSubject.onNext(data)
resultSubject.onCompleted()
} else {
resultSubject.onError(error ?? MyError.FailedToFetchServerData)
}
}
}
task.resume() // 通信開始(非同期でバックグラウンド実行される)
self.task = task
}
func cancel() {
task?.cancel()
}
init(request: NSURLRequest) {
self.request = request
}
}
こんな感じで使います。
let loader = ServerDataLoader(request: request)
loader.result.subscribe {
onNext: { data in
// データ受信時の処理
},
onError: { error in
// エラー時の処理
}
}
loader.start()
NSURLSessionが通信をバックグラウンドスレッドで非同期で実行してくれます。そして dataTaskWithRequest に渡す完了時の処理もバックグラウンドスレッドで実行されます。ここではGCDを使って通知をメインスレッドで行うように変更しています。
注意点としては、このクラスは1回限りの使い捨てということです。__onError/onCompleted が発生したらその後はイベントが通知されない__ので、もう一度 start すると、通信は開始しますが結果が通知されません。
Observable.create
今度は同様の処理を Cold な Observable でやってみます。Observable.create は Cold な Observable を作る汎用的な方法です。「subscribe されるとサーバーからデータを取得する処理を開始する」Observable を作って返しています。
func fetchServerDataWithRequest(request: NSURLRequest) -> Observable<NSData> {
return Observable.create { observer in
let configuration = NSURLSessionConfiguration.defaultSessionConfiguration()
let session = NSURLSession(configuration: configuration)
let task = session.dataTaskWithRequest(request) { data, response, error in
if let data = data {
observer.onNext(data)
observer.onCompleted()
} else {
observer.onError(error ?? MyError.FailedToFetchServerData)
}
}
task.resume()
return AnonymousDisposable { task.cancel() }
}
}
Observer.create には subscribe されたときに実行する処理を渡します。クロージャの引数には ObserverType 型(上記ではobserverという名前)が渡されます。これに onNext/onError/onCompleted メソッドが用意されているので、それらを呼び出してイベントを通知します。クロージャの戻り値として、dispose されたときに購読解除処理を行う Disposable オブジェクトを返します。
Disposable を作る部分は、上の例なら以下のようにも書けますね。
return AnonymousDisposable(task.cancel)
fetchServerDataWithRequest を呼び出した時点では Observable が生成されるだけで、通信は開始しません。この Observable は Cold なので、subscribe して初めて処理を開始します。
また subscribe する度に新しく処理を開始するため、前の Hot を使ったクラスと違って、何度でも利用することができます。
subscribe すると渡したクロージャ(下図の赤色部分)が実行されます。
先ほどと同様に通信は非同期で実行されるので、完了時に実行されるクロージャ(下図の青色部分)はバックグラウンドスレッドで実行されます。
先ほどと違って GCD でメインスレッドに変換していないので、onNext/onError/onCompleted は通信を行ったバックグラウンドスレッドで通知されます。これをメインスレッドで通知させる Rx 流の方法を次に説明します。
補足:
RxCocoa が NSURLSession に rx_data って拡張プロパティを用意してるんで、それ使えば自分でこんな処理を実装する必要はありません。さらに JSON パースまでやってくれる rx_JSON なんてのもあります。
observeOn
先ほどのバックグラウンドスレッドで通知されるイベントを、メインスレッドで通知してもらうように変換するには observeOn を利用します。
_ = fetchServerDataWithRequest(request)
.observeOn(MainScheduler.instance)
.subscribe(
onNext: { data in
// データ受信時の処理
},
onError: { error in
// エラー時の処理
}
)
observeOn に渡すのは Scheduler オブジェクトで、メインスレッドを表す Scheduler は MainScheduler.instance で取得できます。observeOn を呼び出した部分から後は、指定したスレッドで通知されます。
例えば途中で NSData のパースを行うとします。
_ = fetchServerDataWithRequest(request)
.observeOn(MainScheduler.instance)
.map { parse($0) }
.subscribe(
onNext: { result in
// パース済みデータ受信時の処理
},
onError: { error in
// エラー時の処理
}
)
このようにすると、パース処理もメインスレッドで行われます。observeOn と map を入れ替えると、
.map { parse($0) }
.observeOn(MainScheduler.instance)
パース処理までがバックグラウンドスレッドで行われ、その結果の通知がメインスレッドで行われます。
subscribeOn
名前の通り subscibe 処理を実行するスレッドを指定するのですが、勘違いしてはいけません。subscribe に渡すクロージャがそのスレッドで実行されるのではありません。
subscribe 処理は、
- 購読登録処理を行う
- Cold な Observable ならそのときに動作を開始する
ということを行っています。subscribeOn でその実行スレッドを指定できるわけです。
_ = fetchServerDataWithRequest(request)
.subscribeOn(ConcurrentDispatchQueueScheduler(
globalConcurrentQueueQOS: .UserInitiated))
.map { parse($0) }
.observeOn(MainScheduler.instance)
.subscribe(
onNext: { result in
// パース済みデータ受信時の処理
},
onError: { error in
// エラー時の処理
}
)
subscribeOn で QOS_CLASS_USER_INITIATED での実行を指定しています1。これで fetchServerDataWithRequest が返す Observable の購読登録処理が指定スレッドで行われます。この Observable は Cold なので、subscribe 時点で create に渡したクロージャ(下記の赤色部分)が指定スレッドで実行されます。
このように subscribeOn を使うと Cold な Observable の動作スレッドを指定することができます。
しかしこの例ではNSURLSession が元々バックグラウンド動作しますので、通信処理は指定スレッドでは実行されません。
そして通信完了時の処理(青色の部分)も通信バックグラウンドスレッドで実行されます。通信バックグラウンドスレッドで通知されたイベントは、observeOn 以降はメインスレッドで通知されるようになり、subscribe に渡したクロージャはメインスレッドで呼び出されます。
このようにイベントの通知は subscribeOn で指定したスレッドで行われるとは限りません。以下のように内部でバックグランド実行しないようなものは、subscribeOn で指定したスレッドで通知されますが・・・。
_ = Observable.of(1, 2, 3, 4, 5)
.subscribeOn(ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .UserInitiated))
.subscribeNext { value in
// 1, 2, 3, 4, 5のループ処理が指定スレッドで行われ、
// このクロージャも指定スレッドで呼び出される
// ...
}
また subscribeOn が subscribe するときの動作スレッドを指定しているのだと理解していれば、subscribe する前から動作している __Hot な Observable に subscribeOn を指定してもその動作スレッドには何も影響がない__ことも理解できるはずです。
リトライ
その名も retry という operator が用意されています。onError が起こると、自動的に subscribe しなおしてくれます。
_ = fetchServerDataWithRequest(request)
.map { parse($0) }
.retry(3)
.observeOn(MainScheduler.instance)
.subscribe(
onNext: { result in
// パース済みデータ受信時の処理
},
onError: { error in
// エラー時の処理
}
)
上の例では3回リトライを指定しています。map の後に指定しているので、parse で例外が発生して失敗した場合もリトライします。実は __operator による変換で例外が発生した場合、RxSwift は onError で通知してくれる__んです。よくできてますね。
ちなみに retry に回数を指定しなかった場合、ずっとリトライし続けます。もっと細かい制御のできる retryWhen もありますが、複雑なのでまた別の機会に。
タイムアウトの指定
timeout という operator があります。第一引数が秒数、第二引数がタイマーを実行するスレッド(Scheduler)の指定です。タイムアウトすると TimeoutError が onError で通知されます。
let lowPriorityScheduler = ConcurrentDispatchQueueScheduler(
queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))
_ = fetchServerDataWithRequest(request)
.timeout(5, scheduler: lowPriorityScheduler)
.map { parse($0) }
.observeOn(MainScheduler.instance)
.subscribe(
onNext: { result in
// パース済みデータ受信時の処理
},
onError: { error in
// エラー時の処理
}
)
Scheduler の作成は、先ほど QOS による指定方法を使ったので、今回は dispatch_queue_t による指定にしてみました。
タイムアウトしたら代わりに実行する Observable を指定する other 引数を持つバージョンもあります。
.timeout(5, other: Observable.just(NSData()), scheduler: lowPriorityScheduler)
Observable.just は指定した値を onNext で1つだけ流して onCompleted になる Observable を生成します。ここではタイムアウトしたら空の NSData を渡して完了するようにしています。
エラーからの復帰
先ほどの other 引数付きの timeout のように、エラーが発生した場合に onError で伝えずに onNext で何かを渡したい場合があります。例えば検索して何かのリストを取得するとします。取得に失敗したら空のリストを渡すようにしたいといったケースです。そんなときは catchErrorJustReturn や catchError が使えます。
catchErrorJustReturn を使って onError が発生したら空の配列を渡すようにしています( parse の結果が配列で返る想定です)。サーバーからのデータ取得でもパース失敗でも、とくにかく onError になったら空の配列になります。
_ = fetchServerDataWithRequest(request)
.map { parse($0) }
.catchErrorJustReturn([])
.observeOn(MainScheduler.instance)
.subscribeNext { result in
// パース済みデータ受信時の処理
}
catchError の場合はエラー内容によって処理を選択できます。catchError も map と同様に例外が throw されると onError で通知してくれます。
_ = fetchServerDataWithRequest(request)
.map { parse($0) }
.catchError { error in
if error is MyError { throw error }
return Observable.just([])
}
.observeOn(MainScheduler.instance)
.subscribeNext { result in
// パース済みデータ受信時の処理
}
onError のみ発生させる Observable を生成する Observable.error を使って以下のようにも書けます。
.catchError { error in
if error is MyError { return Observable.error(error) }
return Observable.just([])
}
この先へ
前回はイベント通知に Hot な Observable を使う方法を解説したので、今回は非同期処理に Cold な Observable を使う方法を解説してみました。
Cold な Observable の生成関数はここで紹介した of, create, just, error 以外にも、empty, never, range, deferred などが用意されています。
このようにまだまだ紹介していない operator やクラスが用意されています。しかしここまでの RxSwift の使い方が理解できれば、あとは少しずつ使えるものを増やしていけると思います。
ここまでは RxSwift の使い方を解説してきました。次はさらに使いこなせるように、落とし穴にハマらないように、トラブルにあったときに原因を探れるように、RxSwift の動作をより深く理解することを目的にした
を読んでみてください。
-
iOS8で追加されたQOSによる指定です。 ↩