35
31

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

何となくRxJavaを使ってるけど正直よく分かってない人が読むと良さそうな記事・やや発展編

Last updated at Posted at 2015-12-14

RxJava Advent Calendar 2015 14日目の記事です。

本記事は6日目・基礎編の続きです。
前回読んだ記事に現れたCold/Hotの話題から始め、SubjectとBackpressureを見ていきます。

Cold/Hot

【翻訳】あなたが求めていたリアクティブプログラミング入門の最後には、こんなことが書いてあります。

Rxでのプログラミングのコツを理解したければ、Cold vs Hot Observablesのコンセプトの理解が絶対に欠かせない。

Cold/Hotを説明する英語の文章がリンクされているので、少し長いですが冒頭を読んでみます。

Cold observables start running upon subscription, i.e., the observable sequence only starts pushing values to the observers when Subscribe is called. Values are also not shared among subscribers. This is different from hot observables such as mouse move events or stock tickers which are already producing values even before a subscription is active. When an observer subscribes to a hot observable sequence, it will get all values in the stream that are emitted after it subscribes. The hot observable sequence is shared among all subscribers, and each subscriber is pushed the next value in the sequence. For example, even if no one has subscribed to a particular stock ticker, the ticker will continue to update its value based on market movement. When a subscriber registers interest in this ticker, it will automatically receive the next tick.

ColdなObservableはSubscribeが呼ばれて初めて値をObserverに吐き出す。値はSubscribersの間で共有されない。
一方、HotなObservableとは、例えばマウスのmoveイベントや株価のティッカーのような、subscriptionが有効になる前に既に値が生成されているような物を指す。ObserverがHotなObservableをsubscribeすると、Observerはそれまでに生成された値を全て受け取るだろう。
値は全てのSubscribersの間で共有(複数のSubscribersに分岐・分配)され、各Subscriberは常に最新の値を受け取る。例えば、ある特定の株価のティッカーをsubscribeしているSubscribersが1つも無かったとしても、ティッカーは株価の動向に応じて値を吐き出し更新し続ける。したがって、Subscriberがこのティッカーをsubscribeすると、受け取る値はその次に来る最新の値である。

何となく分かったような分からないような感じなので、他の方の表現も見てみましょう。

  • Cold : ストリームの前後をつなぐだけのパイプ。単体では意味が無い。だいたいのオペレータはこっち。
  • Hot : ストリームから値を発行し続ける蛇口。常に垂れ流し。後ろにパイプがたくさん接続できる。

RxのHotとColdについて

Hot ・・・ subscribeされていなくても値をemitする.
Cold ・・・ subscribe()が呼ばれるまで値をemitしない.subscribe()毎に新しく値をemitする.

Observableは友達

つまり「Subscriberに要求されるまで値は出さない受動的なObservable(ストリーム)はCold」「勝手にどんどん値を吐き出す能動的なObservableはHot」ということですね。
こうしたHotなObservableは、これまでのColdなObservableとは別の扱いや注意が必要そうです。

今度は以下の記事を全体的に読んでCold/Hotをもう少し詳しく見ておくと共に、上の英文で出てきた「複数のSubsribers間での共有(分岐・分配)」の意味について知っておきましょう。

RxのHotとColdについて

Cold/Hotの説明を色々読んでいくと、Subjectに何度も触れていることに気付きます。
どうやらこれを良く知っておく必要があるようなので、次はSubjectについての話に移ります。

Subject

RxのSubject、特にRxJavaのSubjectについてはぴったりな記事が11日目にありました。

Rxで知っておくと便利なSubjectたち

「SubjectはSubscriberでもObservableでもあって、種類が色々あって、上手く使うと便利」ということです。

では、これがCold/Hotとどう関係するのか?以下の記事を読んでみてください。
(コードはC#ですが、例に漏れず同じRxを使っているので大体読めます。)

[Rx入門] Subject詳解 / Hotな、ColdなObservableのこと

曰く『Subjectの扱う値はRxの「外」から来ているため、Subjectから来る値はRxの世界で制御されない値であり、これをRxの言葉で表すと"Hot"であると言う。したがってSubjectはHotなObservableである』とのことです。
サンプルコードでもHotなObservableを返すTimerメソッドはreturn subject.AsObservable();していますね。

さて、SubjectがHotなObservableであることは分かったのですが、逆にHotなObservableはSubjectだけなのでしょうか?他にもHotなObservableは無いのでしょうか?

上記記事には「Cold->Hot変換」としてObservable.publish()メソッドの存在を挙げています。
つまり、publish()はHotなObservableを返すのです。
このメソッドの返り値の型はConnectableObservableとなっています1。Subjectではないですね。
ConnectableObservable, SubjectともにObservableのサブクラスであり23、兄弟ではありますが親子ではありません。つまりConnectableObservableはSubjectではないので、HotなObservableはSubject以外にも存在するということです。

  • Hot Observableの一例
  • Observable.interval()やwindow()など時間を与えて得られるObservable
  • Subject (外部からonNext()を自由に呼べるので下流の都合に合わせて制御できません)
  • publish()などを適用して得られるConnectableObservable (Hot変換とも呼ばれます)
  • Hot ObservableにOperatorを適用して得られるObservable(一部を除く)

詳解RxJava2:Backpressureで流速制御
(この記事の主題Backpressureについては次節で触れます)

Backpressure

Cold Observableは「Subscriberに値を要求されるまで値を出さない」と説明してきました。この仕組みを詳しく見ることでBackpressureという重要な概念が出てきます。

Subscriberはsubscribe()で値を要求する際、「自分がいくつ値を処理できるか」をCold Observableに教えることができます。これにより、Cold Observableの値の送信速度がSubscriberの処理速度を超えないように制御することができるのです。
この際にSubscriberはrequest()というメソッドで値の数をObservableに伝えます。このメソッドのコメントを読んでみましょう。

Request a certain maximum number of emitted items from the Observable this Subscriber is subscribed to.
This is a way of requesting backpressure. To disable backpressure, pass {@code Long.MAX_VALUE} to this method.

Observableから吐き出されSubscriberがsubscribeするitemの数の最大値をリクエストする。
この方法はBackpressureと呼ばれる。Backpressureを無効にするには、このメソッドにLong.MAX_VALUEを渡せば良い。

なお、request()にはデフォルトではLong.MAX_VALUEが渡されるため、デフォルトではBackpressureは無効です。

また、当然ながらHot ObservableはCold ObservableのようにBackpressureによる制御はできません。
ではHot Observableの送信速度がSubscriberの処理速度を超えた時にBackpressureを使うとどうなるかというと、「Subscriberは常に最新の値だけ利用して他は捨てる」「バッファを用意してある程度値を貯められるようにする」といった何かしらの対抗策を取れるようになります。

デフォルトでBackpressureが無効ならいつ使うの?Hot Observableでの挙動は他に何があるの?といった疑問を解くには、以下の記事を読むと良さそうです。幸いにも今年のAdvent Calendarに投稿されました。

詳解RxJava2:Backpressureで流速制御

ちなみに、Backpressureを利用して値の送受信速度を制御できるストリーム(Observable)をReactive Streamsと呼ぶそうです。

Reactive Streamsとは?

追記:より正確には、Reactive Streamsとは「”Backpressure付きの非同期ストリーム処理”の標準を定める仕様」のようです。
以下の記事の中で参照されている発表資料が非常に詳しく、Reactive StreamsはもちろんReactive ProgrammingやFRPとの関係までカバーされています!

JJUG ナイトセミナー「Reactive Streams特集」に行ってきた #JJUG

最後に

私がRxJavaをよく分かっていないのは公式をよく読んでないからだろうな・・・という考えから、この記事では、Cold/Hot以外はRxJava wikiの目次からトピックを選びました。
ここで挙げた話題をより詳しく知りたい方は、ぜひwikiをご覧ください。

また、Error Handlingなど、RxJava wikiの目次にあるもののこの記事で触れなかった話題もAdvent Calendarに投稿されていますので、是非とも読んでみてください。

RxJavaのエラーハンドリング

  1. http://reactivex.io/RxJava/javadoc/rx/Observable.html#publish()

  2. http://reactivex.io/RxJava/javadoc/rx/observables/ConnectableObservable.html

  3. http://reactivex.io/RxJava/javadoc/rx/subjects/Subject.html

35
31
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
35
31

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?