search
LoginSignup
1
Help us understand the problem. What are the problem?

posted at

updated at

コルーチンとチャネル入門#3 コールバックの使用

ソース記事はこちら
前回のソリューションは動作するが、スレッドをブロックし、そのためUIはフリーズする。これを避ける伝統的なアプローチはコールバックを使用することである。操作が完了した後に呼び出す必要があるコードを一直線に呼ぶ代わりに、それを別のコールバック、多くの場合はラムダであるが、に抜き出し、あとで呼び出すために、呼び出し側にラムダを渡す。
UIを機敏にするため、すべての計算処理をスレッドに分けるか、Retrofit APIに切り替えることができ、ブロッキング呼び出しの代わりにコールバックを使用し始める。

バックグラウンドスレッドでのloadContributers呼び出し

まず初めに、すべての計算処理を異なるスレッドに移動してみよう。新しいスレッドを開始するには、thread関数を使う。

thread {
    loadContributorsBlocking(service, req)
}

そうすることにより、すべてのロード処理は、異なるスレッドに移動し、メインスレッドは自由になり、別のタスクで埋めることができる。
png
loadContributers関数のシグネチャを変更し、ロード処理が完了した後で呼び出すために最後の引数としてupdateResultsコールバックを取る。

fun loadContributorsBackground(service: GitHubService, req: RequestData, 
                               updateResults: (List<User>) -> Unit)

今では、loadContributorsBackgroundが呼び出されるとき、updateResults呼び出しはコールバックの中で動き、それまでのように、直後すぐではない。

loadContributorsBackground(req) { users ->
    SwingUtilities.invokeLater {
        updateResults(users, startTime)
    }
}

SwingUtilities.invokeLaterを呼ぶことで、updateResults呼び出しが、確実にメインUIスレッド(AWTイベントディスパッチャースレッド)で結果を更新するようになる。
しかし、BACKGROUNDオプション経由でコントリビューターをロードしようとすると、一覧は更新されるが、何も変化がないことがわかる。

課題

src/tasks/Request2Background.ktloadContributorsBackground()を修正し、結果の一覧がUIに表示されるようにすること。

解法

コールバックを呼び出すことを忘れていた!コントリビューターはロードされ、ログで見ることができるが、結果は表示されなかった。これを修正するには、ユーザーの結果一覧について、updateResultsを呼び出す必要がある。

thread {
    updateResults(loadContributorsBlocking(service, req))
}

コールバックに渡されたロジックを明示的に呼び出すことを確認すべきである。さもないと何も起こらない。

RetrofitコールバックAPIの使用

全体のロードのロジックをバックグラウンドスレッドに移動したが、まだ最善のリソース使用ではない。すべてのロード要求は、次から次へと逐次的に行われ、ロード結果を待つ間、スレッドはブロックされるが、何か他のタスクで占められることは可能である。具体的には、その他のロード処理を開始することができ、そうすれば、全体の結果はより早く受け取れる!
それぞれのリポジトリのデータのハンドリングは、二つに分割すべきである。一つ目はロード処理とその後の結果応答の処理である。二つ目の「処理」部分は、コールバックという形式で取り出されるべきである。それぞれのリポジトリのロード処理は、直前のリポジトリの結果を受信する(そして対応するコールバックが呼び出される)前に、開始することができる。
png2
RetrofitのコールバックAPIは、これを達成するのに役に立つ。HTTP要求を開始し、引数としてコールバックを取る、Call.enqueue関数を使うことにする。このコールバック内で、それぞれの要求の後でしなければならないことを指定する必要がある。
src/tasks/Request3Callbacks.kt内のloadContributorsCallbacks()でこのAPIを使用する。便宜上、同じファイル内に宣言されたonResponse拡張関数を使う。それはオブジェクト表現ではなくて、ラムダを引数に取る。

fun loadContributorsCallbacks(service: GitHubService, req: RequestData, 
                              updateResults: (List<User>) -> Unit) {
    service.getOrgReposCall(req.org).onResponse { responseRepos ->  // #1
        logRepos(req, responseRepos)
        val repos = responseRepos.bodyList()
        
        val allUsers = mutableListOf<User>()
        for (repo in repos) {
            service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->   // #2
                logUsers(repo, responseUsers)
                val users = responseUsers.bodyList()
                allUsers += users
            }
        }
        // TODO: Why this code doesn't work? How to fix that?
        updateResults(allUsers.aggregate())
    }
}

コールバックの形式で応答のハンドリングロジックを取り出す。つまり、対応するラムダは#1#2の行から始まる。
しかし、提供された解答は動作しない。プログラムを起動し、CALLBACKSオプションを選択してコントリビューターをロードすると、何も表示されないことに気づく。しかしテストは結果おwすぐに返却し、パスする。なぜか?
なぜ与えられたコードが期待通り動作しないのか、しばらく考えることに時間を費やし、そのあとで読み進めること。

課題(オプション)

ロードされたコントリビューターのリストが表示されるようにコードを書き直すこと。

解法(初回の試み)

全体のロード時間を削減するために並列で多くの要求を開始している。しかし、ロード結果を待っていない。updateResultsコールバックはすべてのロード要求を開始した直後に呼び出されており、目下allUsers一覧はまだデータで満たされていない。
コードに対し、次のような変更をすることでこの問題を修正を試みることができる。

val allUsers = mutableListOf<User>()
for ((index, repo) in repos.withIndex()) {   // #1
    service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
        logUsers(repo, responseUsers)
        val users = responseUsers.bodyList()
        allUsers += users
        if (index == repos.lastIndex) {      // #2
            updateResults(allUsers.aggregate())
        }
    }
}

#1の行で、reposの一覧をインデックス付きで繰り返す。その後、それぞれのコールバックから最後の繰り返しかどうかをチェックする。(#2)そしてその場合に結果を更新する。
しかしこのコードは正しくない。なぜか?問題の原因は何か?この質問に対しての答えを探す試みにいくらかの時間を費やし、そのあとで続きを読むこと。

解法(2回目の試み)

ロード要求は並列に開始するため、最終の結果が最後にやってくる保証がない。結果はどのような順序でもやってくる可能性がある。そのため、完了の条件として、現在のインデックスと最後のインデックスの比較を使う場合、いくつかのreposの結果を失う危険がある。最後のreposの処理要求が、いくつか前の要求よりも早く返却される場合(それは起こりえるが)、処理により多くの時間がかかる要求のすべての結果は、失われるだろう。
この問題を修正する一つの方法は、インデックスを導入し、すべてのリポジトリをすでに処理したかどうかをチェックすることである。

val allUsers = Collections.synchronizedList(mutableListOf<User>())
val numberOfProcessed = AtomicInteger()
for (repo in repos) {
    service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
        logUsers(repo, responseUsers)
        val users = responseUsers.bodyList()
        allUsers += users
        if (numberOfProcessed.incrementAndGet() == repos.size) {
            updateResults(allUsers.aggregate())
        }
    }
}

留意すべきは、リストの同期バージョンとAtomicIntegerを使っている。なぜなら、一般的にgetRepoContributors要求を処理する異なるコールバックが、常に同じスレッドから呼び出される保証がないためである。

解法(3回目の試み)

さらに良い解法はCountDownLatchクラスを使うことである。CountDownLatchは、リポジトリの数で初期化したカウンターを保存する。それぞれのリポジトリを処理する後で、このカウンターを減算し、その後、結果を更新する前に、ラッチがゼロにカウントダウンされるまで、待機する。

val countDownLatch = CountDownLatch(repos.size)
for (repo in repos) {
    service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
        // リポジトリの処理
        countDownLatch.countDown()
    }
}
countDownLatch.await()
updateResults(allUsers.aggregate())

メインスレッドから結果が更新され、それはこのロジックを子スレッドに移譲するよりもより直接的である。
コールバックで正しいコードを書くことは、特にいくつかの配下のスレッドと同期が発生するとき、簡単ではなく、間違いを起こしやすいことがわかる。次に、同じロジックをsuspend関数を使って実装する方法を検討しよう。

RxJavaの使用についての注意

このチュートリアルのすべての読者が、RxJavaライブラリに精通していることを期待していないため、RxJavaを使用するバージョンの詳細の説明はしない。しかし興味があったら、追加の練習としてそれを使い、同じロジックをリアクティブな手法を使って実装することができる。すべての必要な依存性とRxJavaを使う解法が、このプロジェクトの別の「rx」ブランチにある。正しく比較するため、このチュートリアルを最後まで読み、その後で提示されたRxバージョンを実装するか確認することも可能である。

Register as a new user and use Qiita more conveniently

  1. You can follow users and tags
  2. you can stock useful information
  3. You can make editorial suggestions for articles
What you can do with signing up
1
Help us understand the problem. What are the problem?