11月からAndroidエンジニアとして働いている@katsutomuです。子供へのプレゼントはぬいぐるみが喋れるようになるボタンにしようかと思っています!

さて、最近はAndorid開発でもRxJavaの利用はスタンダードになってきていますね!
その中であまり触れられることのないBackpressureの用途について書かせて頂きます。

Backpressureとは何か

Reactive Streamsに含まれる機能の一つでデータを通知する量を受信側が抑制する機能のことです。RxJava2ではFlowableやProcessorとして実装がされています。

必要になる場面としては下記の状況です。

  • 通知側と受信側が異なるスレッド上で処理を行う
  • 通知側より受信側の処理速度が遅い(場合がある

この様な場合、スレッド間の同期処理やエラーハンドリングに加えてAndroidではUI処理をメインスレッドで行う必要があるなど制限が多く、コードが煩雑になりがちです。この悩みをBackpressureは解決してくれます。今回は主題と外れるので詳細な内容は省かせて頂きますが、より詳しく知りたい方は、RxJavaリアクティブプログラミングを読んでみてください。

サンプル

では、サンプルを元に説明させて頂きます。
今回はリストの最下部に到達したタイミングで、APIから追加読み込みするという題材を選ばせて頂きました。これは先述したBackpressure必要になる場面の状況を満たしています。

追加読み込みの実装をする場合、下記の様なコードを書くことがあるかと思います。

abstract class BottomLoadListener : AbsListView.OnScrollListener {
    private var isLoading = false
    private val preloadThreshold = 1
    private var previousTotalItemCount = 0

    override fun onScroll(view: AbsListView?, firstVisibleItem: Int, visibleItemCount: Int, totalItemCount: Int) {
        val displayedItemCount = firstVisibleItem + visibleItemCount
        if (isLoading) {
            if (totalItemCount > previousTotalItemCount) {
                isLoading = false;
                previousTotalItemCount = totalItemCount;
            }
        }
        if (!isLoading && displayedItemCount + preloadThreshold >= totalItemCount) {
            onLoadMore()
            previousTotalItemCount = totalItemCount
            isLoading = true
        }
    }
    abstract fun onLoadMore()
}

筆者もこの書き方を好んで利用していたのですが、いくつか課題もあると感じていました。その課題というのは下記の様なものです。

  • ScrollListenerにonLoadMoreというデータ取得の関心事が含まれている
  • スクロールのタイミングとリストの作りによっては多重通信が発生してしまう
  • データを全て読み込んだ場合の考慮も考えなければいけない

これらの考慮は存外に大変で、不具合を産みやすい要因になり、特にスクロールなどのユーザー操作と非同期処理が絡むと、予想外のことが起きやすいため複合的な要因の特定が難しい不具合を産みやすく、よく頭を悩ませておりました。

Backpressureで解決してみよう!

これらの課題を解決するBackpressureを利用したアプローチを例示していきます。まずScrollListnerは下記の様に変更しています。

val scrolledProcessor = PublishProcessor.create<Triple<Int,Int,Int>>()
listView.setOnScrollListener(object : AbsListView.OnScrollListener{
    override fun onScrollStateChanged(p0: AbsListView?, p1: Int) {}

    override fun onScroll(view: AbsListView?, firstVisibleItem: Int, visibleItemCount: Int, totalItemCount: Int) {
        if (totalItemCount == firstVisibleItem + visibleItemCount) {
            scrolledProcessor.onNext(Triple(totalItemCount, firstVisibleItem, visibleItemCount))
        }
    }
})

ScrollListnerからは、リスト最下部に到達した事を通知するのみを責務としており、通知には、PublishProcessorを利用しています。この変更により追加読み込みに関する関心事を分離すると共に、読み込み中であるか否かの判定も取り除いています。ぐっとシンプルに書く事が出来ました。

逆にデータを取得する側には下記の責務をもたせています。

  • データ取得を開始できるかを制御
  • データ取得中は多重通信が発生しないように抑制
  • 全て読み込んだあとはデータ取得リクエストは行わない

コードで書くとこんな感じになります。

val paginator = Paginater(CatsRepository())
// 読み込み中に何度スクロールしても無視する
scrolledProcessor.onBackpressureDrop().subscribe(paginator)

class Paginater(private val catsRepository: CatsRepository) : FlowableSubscriber<Triple<Int,Int,Int>> {
    private var subscription: Subscription? = null
    private var count = 0
    var fetched: PublishSubject<Pair<List<String>, Boolean>> = PublishSubject.create()

    override fun onSubscribe(s: Subscription) {
        subscription = s
        // 受信を開始したら、1つ通知を受け取れる状態にする
        subscription?.request(1)
    }

    override fun onNext(t: Triple<Int,Int,Int>?) {
        launch (UI, CoroutineStart.UNDISPATCHED) {
            delay(2000)
            // 読み込みが完了したら、1つ通知を受け取れる状態にする
            subscription?.request(1)
            val nextPageList = catsRepository.dummyData.skip(count * 5L).take(5).toList().blockingGet()
            var finished = false
            count++
            if (count == 2) {
                                 // すべて読み込んだら購読を解除する
                subscription?.cancel()
                finished = true
            }
            fetched.onNext(Pair<List<String>, Boolean>(nextPageList, finished))
        }
    }

    override fun onComplete() {}
    override fun onError(t: Throwable?) {}
}

これだけだとイメージがしずらいかと思うので、一覧の流れを明記すると下記のようになります。

  1. onBackpressureDrop()を設定し購読を開始。通知を1つ受け取れる状態にする
  2. 最下部到達の通知を受け付けると、設定したonBackpressureDropがそれ以降届く通知を破棄する
  3. データ取得終了後に再度、通知を1つ受け取れる状態にする
  4. データが存在する間、2-3を繰り返し、最後まで読み込んだら購読を解除する
  5. それ以降リクエストは受け取らないので通信は発生しない

このようにScollListerとデータ取得側をBackpressureの機能を用いて繋ぐ事により、シンプルで安全にリストの最下部に到達したタイミングで、APIから追加読み込みを達成出来るようになりました!

まとめるとこんな感じです。

  • ScrollListenerからデータ取得の関心事をなくし、責務の分離が出来た
  • onBackpressureDropでスクロールのタイミングの考慮を減らせた
  • リストを全て読み込んだ場合に購読を解除し、最下部に到達しても通信が発生しないようにできた

さいごに

いかがでしたでしょうか?
Backpressueは使い道がイメージし出来ず、使いこなせていなかったのですが、今回利用したサンプルを思いついた時に、ブレイクスルーを感じました!

サンプルコードはこちらのリポジトリにありますので、気になる方は見てみてくださいね。
https://github.com/katsutomu/KotlinRxJava2Sample
明日はtommykwさんによる記事です!お楽しみに~~~~~!

Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.