35
26

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RxJava2.x Flowableでbackpressureを制御する

Last updated at Posted at 2017-03-22

はじめに

スクリーンショット 2017-03-22 9.46.18.png

短時間に大量にemitされるストリームに対して受け取った側で重い処理が実行される場合など、処理が追いつかずにあふれてしまうことがあります。そんな時にbackpressureを使います。
backpressureとは、ストリームの流れる量をコントロールする仕組みです。

RxJava2.xではObservableとFlowableが分離され、Observable = backpressureなし、Flowable = backpressureありとなっています。

この記事ではFlowableを使ってbackpressureを制御してみます。
サンプルはKotlinで実装しています。Kotlin経験がなくてもJavaでRxを扱ったことがあれば難しくないと思います :bulb:

サンプルアプリ

device-2017-03-22-102025.png

内容

  • シークバーを操作すると値の変化が大量にemitされる
  • 受け取った側では実行に1000ミリ秒必要な処理

実装

  • 画面上部:Observable
    • backpressure制御なし
    • シークバーを動かす度に処理が大量に実行されてしまう
  • 画面下部:Flowable
    • 処理が1回終わるごとに次の値を受け取るようにbackpressure制御している
    • シークバーを動かしても処理が大量に実行されない

以下ではこのサンプルのソースコードを見ていきます。
ソースコードはこちら

まずはFlowableを作る

シークバーの値の変化をPublishProcessorを使ってemitします。
PublishProcessorはFlowableを継承したクラスです。

io.reactivex.Flowable<T>
  io.reactivex.processors.FlowableProcessor<T>
    io.reactivex.processors.PublishProcessor<T>
val processor = PublishProcessor.create<Int>()
val seekBar = findViewById(R.id.seekBar) as SeekBar

seekBar.setOnSeekBarChangeListener(object : SeekBar.OnSeekBarChangeListener {
    override fun onProgressChanged(bar: SeekBar?, progress: Int, fromUser: Boolean) {
        // シークバーの値が変わった時にprocessorへonNextされる
        processor.onNext(progress)
    }

    override fun onStartTrackingTouch(p0: SeekBar?) {}
    override fun onStopTrackingTouch(p0: SeekBar?) {}
})

backpressureの制御

受け取り側の処理が完了するごとに1つずつ値を受け取れるよう、subscription.requestを使って制御します。onBackpressureLatest()オペレータにより最新の値を受け取るようにします。

val textView = findViewById(R.id.text) as TextView
processor
    .onBackpressureLatest()  // 最新の値を受け取る
    .subscribe(object : FlowableSubscriber<Int> {
        private var subscription: Subscription? = null
        override fun onNext(t: Int?) {
            // 実行に1000ミリ秒必要な処理を開始
            Observable.just(0)
                    .delay(1000, TimeUnit.MILLISECONDS)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe {
                        textView.text = additionalString() + textView.text
                        subscription?.request(1) // 処理が終わってから次の値を1つ受け取る
                    }
        }

        override fun onError(t: Throwable?) {
        }

        override fun onComplete() {
        }

        override fun onSubscribe(s: Subscription?) {
            this.subscription = s // subscriptionを保持
            s?.request(1) // 最初に受け取れる値は1つ
        }
    })

実際にこんなコード書くの?もっと楽に書きたい。。。

  • 実際にはthrottle系オペレータを使って簡単に作っちゃうことが多そうです。
processor
    .throttleLast(1000, TimeUnit.MILLISECONDS) // 1000ミリ秒間で最新の値を1つ流す
    .subscribe{ /*省略*/ }

まとめ

シークバーの例を使ってFlowableでbackpressure制御をしてみました。
実際にはthrottle系オペレータを使って実装することが多そうですが、RxJava2.x導入の機会にsubscription.request()を使った制御を知っておくのは良さそうです。

参考

35
26
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
35
26

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?