Help us understand the problem. What is going on with this article?

詳解RxJava2:Backpressureで流速制御

More than 1 year has passed since last update.

追記:記事のタイトルがRxJava 2.xを対象にしているように見えますが、正確には詳解RxJava 1.xの2本目です><

謝辞:チームでは本当に自由にRxJavaを使わせてもらっていて、今回の記事の内容も実際にアプリでReactiveな表示更新ロジックを書こうとして習得したものです。

この記事ではRxJavaのBackpressureの仕組みについて説明しつつ、Backpressureを使って問題をうまく解決できるパターンを紹介します。深夜テンションで一部お見苦しい点がございますがお許しください。

これまでのあらすじ

Backpressureに関する公式ドキュメント

https://github.com/ReactiveX/RxJava/wiki/Backpressure

Observableの動作ペースとHot Observable / Cold Observable

突然ですがObservableはどれくらいのスピードで動作する(onNext()を呼ぶ、emitする)のでしょうか。
実はストリームの下流側(Observer、子Observable)の都合に合わせてemitする回数を制限するObservable(Cold Observable)と、下流の都合に関係なく自分のペースでemitするObservable(Hot Observable)が存在します。

  • Cold Observableの一例
    • Observable.from(Iterable) (下流が要求した数ずつIterableをiterateします)
    • Observable.range() (countにInteger.MAX_VALUEを指定しても下流の要求した数ずつemitします)
    • Cold Observableに時間ベースでないOperator(filter()とかmap()とか)を適用して得られるObservable(一部を除く)
  • Hot Observableの一例
    • Observable.interval()window()など時間を与えて得られるObservable
    • Subject (外部からonNext()を自由に呼べるので下流の都合に合わせて制御できません)
    • publish()などを適用して得られるConnectableObservable (Hot変換とも呼ばれます)
    • Hot ObservableにOperatorを適用して得られるObservable(一部を除く)

http://qiita.com/toRisouP/items/f6088963037bfda658d3

Backpressure

Cold Observableが下流に合わせてemitするよう制御するための仕組みがBackpressureです。
Backpressureのニュアンスは水道の蛇口を絞ると水が流れにくくなるアレをイメージしてもらうのがわかりやすそうです。

仕組み

Observableがsubscribe()される時にはSubscriberが作られて上流(親)Observableに渡されます。
Subscriberにはprotectedなrequest(long)メソッドが生えていて、Subscriber自身がこれを呼び出して上流Observableに最大で何回onNext()を叩いても良いか伝えます。上流側で持っている最大個数はrequest()を呼び出すたびに加算され、onNext()が叩かれるたびに減っていきます。

上流Observable側ではこのrequest()を受け取るために、渡されたSubscriberに対しsetProducer(Producer)を呼び出してProducerをセットします。Producerrequest(long)メソッドだけのinterfaceで、Observable側はこのinterfaceを介して下流側からBackpressureを受け取ることができます。

下流側
  Observable.subscribe() oO(始めるで)
  ↓
  subscriber = new Subscriber() {...}
  subscriber.onStart()
  (onStart()内で) this.request(n) oO(n個処理できそう)
  ↓
  onSubscribe.call() oO(上流起動しよ)
↓
上流側
  MyObservable.subscriber.setProducer(new MyObservable.MyProducer() {...}) oO(Backpressure教えて〜)
  ↓
  (下流から)Subscriber.producer.request(n) oO(setされたし上流に伝えとこ)
  ↓
  MyObservable.MyProducer.request(n) oO(なんかn個ぐらいいけそう)
  ↓
  MyObservable.subscriber.onNext(item) × n回 oO(よっしゃ仕事するで!)
↓
下流側
  Subscriber.onNext(item) oO(なんかきたわ)
  ↓(必要に応じて)
  Subscriber.child.onNext(item) oO(下流に伝えとこ)
  Subscriber.request(1) oO(も1個いけるんちゃう?)

なお、Subscriberが動作開始時(onStart()メソッド内)にrequest()を呼ばなかった場合、デフォルトでLong.MAX_VALUEがrequest()されます。Long.MAX_VALUEがrequest()されるとBackpressureは無効になり、ObservableがColdの場合でも自分のペースでonNext()を呼び出します。すなわちデフォルトではBackpressureは無効です

上流Observableに対してBackpressureを使用しているOperatorの例

デフォルトでは無効と書きましたがではどんな時に使われるのでしょうか。
下記のOperatorを使うと、Backpressureでの流量制御が自分で指定したパラメータに応じて行われます。

  • flatMap(Func1<? super T,? extends Observable<? extends R>> func, int maxConcurrent)
    • 非同期処理でもmaxConcurrent分だけしか同時に走らないよう指定できます。最初にrequest(maxConcurrent)され、funcが返したObservableが完了した分だけrequest()されます。
  • merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent)
    • 上記flatMap()はmerge()を使って実装されています:merge(map(func), maxConcurrent);
  • take(int count)
    • あらかじめ最大で何回のonNext()を受け付けるか決まっているので、最大でcount個だけrequest()します。ただし、下流側からのrequst()の数がcountより少なければその数になります。

これ以外にもBackpressureが有効になるケースがあり、例えばonNext()時にサイズが固定のRingBufferに一旦キューイングしているobserveOn()はRingBufferのサイズだけrequest()します。

※余談ですがfilter()の実装は評価関数がfalseを返すたびに追加でrequest(1)が呼ばれるという面白い実装になっています。

Hot Observableの場合

Hot ObservableはBackpressureが伝えられてもガン無視してonNext()し続けます。前述のRingBufferのキャパシティを超過した場合など、場合によってはMissingBackpressureExceptionを投げる可能性もあります。

上流のHot Observableに対してBackpressureによる流量制御を行うためのOperatorを適用すると、下流のBackpressureを尊重することができます。これらのOperatorは、下流にrequest()されたよりも多く上流からonNext()された場合にonNext()された要素を捨てたりバッファに積んだりして制御します。

  • onBackpressureDrop
    • request()されるまでの間に届いたonNext()を全部捨てます。捨てた時に指定された関数を叩く機能もあります。
  • onBackpressureLatest()
    • 最新の1つだけ残して他は捨てます。次にrequest()された時に残しておいた最新の1つをonNext()します。ただし、後から2以上request()されても1回しかonNext()されないので注意が必要です。
  • onBackpressureBuffer
    • バッファに積みます。バッファが無限に伸びていったり、limit指定すると溢れる可能性があるので、用途は限られるかもしれません。

Hot Observableに対してBackpressureの制御を行う例、Backpressure使わずに失敗した例

ボタンが押されるたびに非同期リクエストを行う(失敗例)

例えば、ユーザがボタンを押すたびにサーバに問い合わせて画面を更新する場合を想像してください(例なのでボタンの無効化はしません、実際にはPush通知とかだったりするかも)。

PublishSubject<Void> onClickObservable = PublishSubject.create();
view.findViewById(R.id.button).setOnClickListener((view) -> {
    subject.onNext(next);
}); // ※RxBindingライブラリのRxView.clicks()と同じ

onClickObservable
    .flatMap(aVoid -> mApiClient.fetch()) // 非同期リクエスト、例:Retrofitライブラリ
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(response -> render(response));

サーバへの問い合わせが終わらない間にボタンが連打されたらどうなるのでしょうか。
正解は・・押された回数だけリクエストが並列に開始されちゃいます。
flatMap(Func1)はすぐに新しいObservableをsubscribeします。
これだと不必要にリクエストや再描画が実行されてしまいます。

無理やり同期的にやるとどうなるか(失敗例)

では、無理やり同期的にリクエストを呼び出した場合はどうなるのでしょうか。
※注意:下記はダメな例です。非同期処理をmap()の中でブロッキングして叩くのはやめましょう。

onClickObservable
    .observeOn(Schedulers.io()) // Main ThreadからI/O Threadに切り替え
    .map(aVoid -> mApiClient.fetchSync()) // 同期リクエスト
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(response -> render(response));

正解は、リクエストが終わるまで次のリクエストは開始されませんが、押された回数だけ順番にリクエストが走ってしまいます。
map()はBackpressureを特に制御していない(下流側に任せている)ためです。これはRxJavaは遅い操作を同期的に実行することを想定していないためだと思います。この場合は下流側にはobserveOn()しかないので、RingBufferの容量の分だけrequest()されています。
自分は、このmap()と後ほど紹介するonBackpressureLatest()を組み合わせて対処しようとしてしまいましたが、これは全く効果がありません。

flatMap()でBackpressureを使う

flatMap()のmaxConcurrentに1を指定し、onBackpressureDrop()でHot Observableを制御します。

onClickObservable
    .onBackpressureDrop() // Backpressureがかかった場合はemitせずに捨てる
    .flatMap(aVoid -> mApiClient.fetch(), 1) // 最大で1並列しか実行しない
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(response -> render(response));

これなら何回ボタンを押されても大丈夫です!

実際に作っているアプリでは、サーバの情報が更新されるたびにPush通知が来るようになっていて、その時に最新の情報で再描画するために、onBackpressureLatest()を使用して同じようなロジックを実装しています。

window()やthrottleXXX()などで時間ベースで制御する

単にemitが速すぎる場合、例えばユーザが入力するたびにバリデーションしている場合などは、時間ベースで制御した方が良い可能性もあります。RxJava公式ではBackpressureを避けるのに便利なOperator一覧をwikiに載せていたりするので、それも確認してみてください。

次の記事

詳解RxJava3は「自分でObservableを作る、またはリークしないObservableの作り方」となる予定です。
ネクストコ○ンズヒンツッ!! サブスクライバーのリファレンス。

(書く書く詐欺している間にまともで簡単なやり方が生み出されつつあるので代わりにこちらの記事を御覧ください。↓
http://qiita.com/kazy/items/9c8a97975023d0661a6c

yuya_presto
[a.k.a. ypresto] Android StudioとXcodeを同時起動しながらAtomでRubyとかES6とか書きつつgitでmergeしてpushする日々。
http://ypresto.strikingly.com
codetakt
学習管理システム「schoolTakt」を運営するEdTechのスタートアップ企業
https://codetakt.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away