はじめに
短時間に大量にemitされるストリームに対して受け取った側で重い処理が実行される場合など、処理が追いつかずにあふれてしまうことがあります。そんな時にbackpressureを使います。
backpressureとは、ストリームの流れる量をコントロールする仕組みです。
RxJava2.xではObservableとFlowableが分離され、Observable = backpressureなし、Flowable = backpressureありとなっています。
この記事ではFlowableを使ってbackpressureを制御してみます。
サンプルはKotlinで実装しています。Kotlin経験がなくてもJavaでRxを扱ったことがあれば難しくないと思います
サンプルアプリ
内容
- シークバーを操作すると値の変化が大量にemitされる
- 受け取った側では実行に1000ミリ秒必要な処理
実装
- 画面上部:Observable
- backpressure制御なし
- シークバーを動かす度に処理が大量に実行されてしまう
- 画面下部:Flowable
- 処理が1回終わるごとに次の値を受け取るようにbackpressure制御している
- シークバーを動かしても処理が大量に実行されない
以下ではこのサンプルのソースコードを見ていきます。
ソースコードはこちら
まずはFlowableを作る
シークバーの値の変化をPublishProcessorを使ってemitします。
PublishProcessorはFlowableを継承したクラスです。
io.reactivex.Flowable<T>
io.reactivex.processors.FlowableProcessor<T>
io.reactivex.processors.PublishProcessor<T>
val processor = PublishProcessor.create<Int>()
val seekBar = findViewById(R.id.seekBar) as SeekBar
seekBar.setOnSeekBarChangeListener(object : SeekBar.OnSeekBarChangeListener {
override fun onProgressChanged(bar: SeekBar?, progress: Int, fromUser: Boolean) {
// シークバーの値が変わった時にprocessorへonNextされる
processor.onNext(progress)
}
override fun onStartTrackingTouch(p0: SeekBar?) {}
override fun onStopTrackingTouch(p0: SeekBar?) {}
})
backpressureの制御
受け取り側の処理が完了するごとに1つずつ値を受け取れるよう、subscription.request
を使って制御します。onBackpressureLatest()
オペレータにより最新の値を受け取るようにします。
val textView = findViewById(R.id.text) as TextView
processor
.onBackpressureLatest() // 最新の値を受け取る
.subscribe(object : FlowableSubscriber<Int> {
private var subscription: Subscription? = null
override fun onNext(t: Int?) {
// 実行に1000ミリ秒必要な処理を開始
Observable.just(0)
.delay(1000, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
textView.text = additionalString() + textView.text
subscription?.request(1) // 処理が終わってから次の値を1つ受け取る
}
}
override fun onError(t: Throwable?) {
}
override fun onComplete() {
}
override fun onSubscribe(s: Subscription?) {
this.subscription = s // subscriptionを保持
s?.request(1) // 最初に受け取れる値は1つ
}
})
実際にこんなコード書くの?もっと楽に書きたい。。。
- 実際にはthrottle系オペレータを使って簡単に作っちゃうことが多そうです。
processor
.throttleLast(1000, TimeUnit.MILLISECONDS) // 1000ミリ秒間で最新の値を1つ流す
.subscribe{ /*省略*/ }
まとめ
シークバーの例を使ってFlowableでbackpressure制御をしてみました。
実際にはthrottle系オペレータを使って実装することが多そうですが、RxJava2.x導入の機会にsubscription.request()
を使った制御を知っておくのは良さそうです。