Help us understand the problem. What is going on with this article?

RxSwift入門(2) 非同期処理してみる

More than 1 year has passed since last update.

はじめに

オブザーバーパターンから始めるRxSwift入門」の続きです。

前の記事では Rx の Hot な Observable のみを使った利用方法を説明しました。今回は Cold な Observable の生成方法や、非同期処理を実現するためのスレッド指定方法などを解説します。

HotなObservableって何?

前の記事で説明した Subject を使ったものが Hot な Observable です。Subject そのものでなくても、例えば Variable のように内部で Subject を使っているものもあります。

  • どこからも subscribe されていなくても動作する
  • 複数から subscribe されると同じ動作結果を通知する

という特徴があります。オブサーバーパターンを置き換えるなら、この動作は当たり前ですよね?

PublishSubject.png

上図の PublishSubject はどこからも subscribe されていなくても動作していてイベントを発行しています。それを subscribe すると、その時点以降のイベントを通知してもらえるようになります。複数から subscribe しても、同じタイミングで同じイベントが通知されます。

ColdなObservableって何?

逆に Subject を使わないで作られる Observable は全て Cold です。Rx は Cold の方が基本で、むしろ Hot の方が特殊と言えます。

  • subscribe されないと動作せず、subscribe されると動作を開始する
  • 複数から subscribe されるとそれぞれ別々に動作する

という特徴があります。なんのこっちゃと思うかもしれませんが、例えば配列の forEach を呼び出す場合を考えてみてください。

[ 1, 2, 3, 4, 5].forEach { value in
  // ...
}

forEach を呼び出すと配列の最初から値を渡してきます。しかも複数が forEach を呼び出すとそれぞれ別々に動作して最初から値を渡してきます。

RxSwift で同じ動作をさせるには以下のようにします。

_ = Observable.of(1, 2, 3, 4, 5).subscribeNext { value in
  // ...
}

Observable.of(1, 2, 3, 4, 5) で作っているのが Cold な Observable になります。subscribe するまでは動作せず、複数から subsribe されるとそれぞれ別々に動作して、1, 2, 3, 4, 5 を順番に渡します。

HotなObservableを使って非同期通信結果を通知してみる

前の記事で紹介した Subject を使う方法で、サーバーからデータを取得するクラスを作ってみます。

class ServerDataLoader {
  private let resultSubject = PublishSubject<NSData>()
  private let request: NSURLRequest
  private var task: NSURLSessionDataTask?

  var result: Observable<NSData> { return resultSubject }

  func start() {
    let configuration = NSURLSessionConfiguration.defaultSessionConfiguration()
    let session = NSURLSession(configuration: configuration)
    let task = session.dataTaskWithRequest(request) { // 完了時の処理
      [resultSubject] data, response, error in
      dispatch_async(dispatch_get_main_queue()) {
        if let data = data {
          resultSubject.onNext(data)
          resultSubject.onCompleted()
        } else {
          resultSubject.onError(error ?? MyError.FailedToFetchServerData)
        }
      }
    }
    task.resume() // 通信開始(非同期でバックグラウンド実行される)
    self.task = task
  }

  func cancel() {
    task?.cancel()
  }

  init(request: NSURLRequest) {
    self.request = request
  }
}

こんな感じで使います。

let loader = ServerDataLoader(request: request)
loader.result.subscribe {
  onNext: { data in
    // データ受信時の処理
  },
  onError: { error in
    // エラー時の処理
  }
}
loader.start()

NSURLSessionが通信をバックグラウンドスレッドで非同期で実行してくれます。そして dataTaskWithRequest に渡す完了時の処理もバックグラウンドスレッドで実行されます。ここではGCDを使って通知をメインスレッドで行うように変更しています。

注意点としては、このクラスは1回限りの使い捨てということです。onError/onCompleted が発生したらその後はイベントが通知されないので、もう一度 start すると、通信は開始しますが結果が通知されません。

Observable.create

今度は同様の処理を Cold な Observable でやってみます。Observable.create は Cold な Observable を作る汎用的な方法です。「subscribe されるとサーバーからデータを取得する処理を開始する」Observable を作って返しています。

func fetchServerDataWithRequest(request: NSURLRequest) -> Observable<NSData> {
  return Observable.create { observer in
    let configuration = NSURLSessionConfiguration.defaultSessionConfiguration()
    let session = NSURLSession(configuration: configuration)
    let task = session.dataTaskWithRequest(request) { data, response, error in
      if let data = data {
        observer.onNext(data)
        observer.onCompleted()
      } else {
        observer.onError(error ?? MyError.FailedToFetchServerData)
      }
    }
    task.resume()
    return AnonymousDisposable { task.cancel() }
  }
}

Observer.create には subscribe されたときに実行する処理を渡します。クロージャの引数には ObserverType 型(上記ではobserverという名前)が渡されます。これに onNext/onError/onCompleted メソッドが用意されているので、それらを呼び出してイベントを通知します。クロージャの戻り値として、dispose されたときに購読解除処理を行う Disposable オブジェクトを返します。

Disposable を作る部分は、上の例なら以下のようにも書けますね。

return AnonymousDisposable(task.cancel)

fetchServerDataWithRequest を呼び出した時点では Observable が生成されるだけで、通信は開始しません。この Observable は Cold なので、subscribe して初めて処理を開始します。

また subscribe する度に新しく処理を開始するため、前の Hot を使ったクラスと違って、何度でも利用することができます。

subscribe すると渡したクロージャ(下図の赤色部分)が実行されます。
先ほどと同様に通信は非同期で実行されるので、完了時に実行されるクロージャ(下図の青色部分)はバックグラウンドスレッドで実行されます。

fetchServerData.png

先ほどと違って GCD でメインスレッドに変換していないので、onNext/onError/onCompleted は通信を行ったバックグラウンドスレッドで通知されます。これをメインスレッドで通知させる Rx 流の方法を次に説明します。

補足:
RxCocoa が NSURLSession に rx_data って拡張プロパティを用意してるんで、それ使えば自分でこんな処理を実装する必要はありません。さらに JSON パースまでやってくれる rx_JSON なんてのもあります。

observeOn

先ほどのバックグラウンドスレッドで通知されるイベントを、メインスレッドで通知してもらうように変換するには observeOn を利用します。

_ = fetchServerDataWithRequest(request)
  .observeOn(MainScheduler.instance)
  .subscribe(
    onNext: { data in
      // データ受信時の処理
    },
    onError: { error in
      // エラー時の処理
    }
  )

observeOn に渡すのは Scheduler オブジェクトで、メインスレッドを表す Scheduler は MainScheduler.instance で取得できます。observeOn を呼び出した部分から後は、指定したスレッドで通知されます。

例えば途中で NSData のパースを行うとします。

_ = fetchServerDataWithRequest(request)
  .observeOn(MainScheduler.instance)
  .map { parse($0) }
  .subscribe(
    onNext: { result in
      // パース済みデータ受信時の処理
    },
    onError: { error in
      // エラー時の処理
    }
  )

このようにすると、パース処理もメインスレッドで行われます。observeOn と map を入れ替えると、

  .map { parse($0) }
  .observeOn(MainScheduler.instance)

パース処理までがバックグラウンドスレッドで行われ、その結果の通知がメインスレッドで行われます。

subscribeOn

名前の通り subscibe 処理を実行するスレッドを指定するのですが、勘違いしてはいけません。subscribe に渡すクロージャがそのスレッドで実行されるのではありません

subscribe 処理は、

  • 購読登録処理を行う
  • Cold な Observable ならそのときに動作を開始する

ということを行っています。subscribeOn でその実行スレッドを指定できるわけです。

_ = fetchServerDataWithRequest(request)
  .subscribeOn(ConcurrentDispatchQueueScheduler(
    globalConcurrentQueueQOS: .UserInitiated))
  .map { parse($0) }
  .observeOn(MainScheduler.instance)
  .subscribe(
    onNext: { result in
      // パース済みデータ受信時の処理
    },
    onError: { error in
      // エラー時の処理
    }
  )

subscribeOn で QOS_CLASS_USER_INITIATED での実行を指定しています1。これで fetchServerDataWithRequest が返す Observable の購読登録処理が指定スレッドで行われます。この Observable は Cold なので、subscribe 時点で create に渡したクロージャ(下記の赤色部分)が指定スレッドで実行されます。

fetchServerData.png

このように subscribeOn を使うと Cold な Observable の動作スレッドを指定することができます

しかしこの例ではNSURLSession が元々バックグラウンド動作しますので、通信処理は指定スレッドでは実行されません。

そして通信完了時の処理(青色の部分)も通信バックグラウンドスレッドで実行されます。通信バックグラウンドスレッドで通知されたイベントは、observeOn 以降はメインスレッドで通知されるようになり、subscribe に渡したクロージャはメインスレッドで呼び出されます。

このようにイベントの通知は subscribeOn で指定したスレッドで行われるとは限りません。以下のように内部でバックグランド実行しないようなものは、subscribeOn で指定したスレッドで通知されますが・・・。

_ = Observable.of(1, 2, 3, 4, 5)
  .subscribeOn(ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .UserInitiated))
  .subscribeNext { value in
    // 1, 2, 3, 4, 5のループ処理が指定スレッドで行われ、
    // このクロージャも指定スレッドで呼び出される
    // ...
  }

また subscribeOn が subscribe するときの動作スレッドを指定しているのだと理解していれば、subscribe する前から動作している Hot な Observable に subscribeOn を指定してもその動作スレッドには何も影響がないことも理解できるはずです。

リトライ

その名も retry という operator が用意されています。onError が起こると、自動的に subscribe しなおしてくれます。

_ = fetchServerDataWithRequest(request)
  .map { parse($0) }
  .retry(3)
  .observeOn(MainScheduler.instance)
  .subscribe(
    onNext: { result in
      // パース済みデータ受信時の処理
    },
    onError: { error in
      // エラー時の処理
    }
  )

上の例では3回リトライを指定しています。map の後に指定しているので、parse で例外が発生して失敗した場合もリトライします。実は operator による変換で例外が発生した場合、RxSwift は onError で通知してくれるんです。よくできてますね。

ちなみに retry に回数を指定しなかった場合、ずっとリトライし続けます。もっと細かい制御のできる retryWhen もありますが、複雑なのでまた別の機会に。

タイムアウトの指定

timeout という operator があります。第一引数が秒数、第二引数がタイマーを実行するスレッド(Scheduler)の指定です。タイムアウトすると TimeoutError が onError で通知されます。

let lowPriorityScheduler  = ConcurrentDispatchQueueScheduler(
  queue: dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0))

_ = fetchServerDataWithRequest(request)
  .timeout(5, scheduler: lowPriorityScheduler)
  .map { parse($0) }
  .observeOn(MainScheduler.instance)
  .subscribe(
    onNext: { result in
      // パース済みデータ受信時の処理
    },
    onError: { error in
      // エラー時の処理
    }
  )

Scheduler の作成は、先ほど QOS による指定方法を使ったので、今回は dispatch_queue_t による指定にしてみました。

タイムアウトしたら代わりに実行する Observable を指定する other 引数を持つバージョンもあります。

  .timeout(5, other: Observable.just(NSData()), scheduler: lowPriorityScheduler)

Observable.just は指定した値を onNext で1つだけ流して onCompleted になる Observable を生成します。ここではタイムアウトしたら空の NSData を渡して完了するようにしています。

エラーからの復帰

先ほどの other 引数付きの timeout のように、エラーが発生した場合に onError で伝えずに onNext で何かを渡したい場合があります。例えば検索して何かのリストを取得するとします。取得に失敗したら空のリストを渡すようにしたいといったケースです。そんなときは catchErrorJustReturn や catchError が使えます。

catchErrorJustReturn を使って onError が発生したら空の配列を渡すようにしています( parse の結果が配列で返る想定です)。サーバーからのデータ取得でもパース失敗でも、とくにかく onError になったら空の配列になります。

_ = fetchServerDataWithRequest(request)
  .map { parse($0) }
  .catchErrorJustReturn([])
  .observeOn(MainScheduler.instance)
  .subscribeNext { result in
      // パース済みデータ受信時の処理
  }

catchError の場合はエラー内容によって処理を選択できます。catchError も map と同様に例外が throw されると onError で通知してくれます。

_ = fetchServerDataWithRequest(request)
  .map { parse($0) }
  .catchError { error in
    if error is MyError { throw error }
    return Observable.just([])
  }
  .observeOn(MainScheduler.instance)
  .subscribeNext { result in
      // パース済みデータ受信時の処理
  }

onError のみ発生させる Observable を生成する Observable.error を使って以下のようにも書けます。

  .catchError { error in
    if error is MyError { return Observable.error(error) }
    return Observable.just([])
  }

この先へ

前回はイベント通知に Hot な Observable を使う方法を解説したので、今回は非同期処理に Cold な Observable を使う方法を解説してみました。

Cold な Observable の生成関数はここで紹介した of, create, just, error 以外にも、empty, never, range, deferred などが用意されています。

このようにまだまだ紹介していない operator やクラスが用意されています。しかしここまでの RxSwift の使い方が理解できれば、あとは少しずつ使えるものを増やしていけると思います。

ここまでは RxSwift の使い方を解説してきました。次はさらに使いこなせるように、落とし穴にハマらないように、トラブルにあったときに原因を探れるように、RxSwift の動作をより深く理解することを目的にした

RxSwiftを深く理解する

を読んでみてください。


  1. iOS8で追加されたQOSによる指定です。 

k5n
好きなことはプログラミングと楽器演奏。そのうち金になる方を仕事に、ならないほうを趣味にしています。 若かりし頃はベンチャーでバリバリやってましたが、一度体を壊してからは忙し過ぎない仕事を探してマイペースにやってます。 C/C++, Swift, Objective-C, Java, Kotlin, Ruby, Python, PHP, TypeScript, Rust
creato
「言われたものを作るだけ」ではない共創型システムパートナーを掲げる名古屋の少数精鋭開発会社。
http://www.creato-c.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした